From 72e142abcb4f4bb01bebdfdacef5e643b566b4a7 Mon Sep 17 00:00:00 2001 From: Tom Wilkie Date: Tue, 3 Apr 2018 17:42:07 +0100 Subject: [PATCH 1/6] Move DynamoDB index tests to cover other storage backends. --- pkg/chunk/{aws/storage_client_test.go => storage/index_test.go} | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename pkg/chunk/{aws/storage_client_test.go => storage/index_test.go} (100%) diff --git a/pkg/chunk/aws/storage_client_test.go b/pkg/chunk/storage/index_test.go similarity index 100% rename from pkg/chunk/aws/storage_client_test.go rename to pkg/chunk/storage/index_test.go From e1c9078df3f92b57fd0dc219a7ce9a368dfced98 Mon Sep 17 00:00:00 2001 From: Tom Wilkie Date: Tue, 3 Apr 2018 17:32:26 +0100 Subject: [PATCH 2/6] Refactor index tests: - Factor out `forAllFixtures` function, to run a test over all the storage types. - Move some utils functions to pkg/chunk/storage/utils_test.go. --- pkg/chunk/storage/index_test.go | 335 +++++++++++------------ pkg/chunk/storage/storage_client_test.go | 133 +++------ pkg/chunk/storage/utils_test.go | 72 +++++ 3 files changed, 275 insertions(+), 265 deletions(-) create mode 100644 pkg/chunk/storage/utils_test.go diff --git a/pkg/chunk/storage/index_test.go b/pkg/chunk/storage/index_test.go index e8376b8e83c..a9117b44f14 100644 --- a/pkg/chunk/storage/index_test.go +++ b/pkg/chunk/storage/index_test.go @@ -1,4 +1,4 @@ -package aws +package storage import ( "context" @@ -6,192 +6,179 @@ import ( "testing" "github.com/stretchr/testify/require" - "github.com/weaveworks/cortex/pkg/chunk" ) -func TestAWSStorageClient(t *testing.T) { - mockDB := newMockDynamoDB(0, 0) - client := storageClient{ - DynamoDB: mockDB, - queryRequestFn: mockDB.queryRequest, - batchGetItemRequestFn: mockDB.batchGetItemRequest, - batchWriteItemRequestFn: mockDB.batchWriteItemRequest, - } - batch := client.NewWriteBatch() - for i := 0; i < 30; i++ { - batch.Add("table", fmt.Sprintf("hash%d", i), []byte(fmt.Sprintf("range%d", i)), nil) - } - mockDB.createTable("table") - - err := client.BatchWrite(context.Background(), batch) - require.NoError(t, err) - - for i := 0; i < 30; i++ { - entry := chunk.IndexQuery{ - TableName: "table", - HashValue: fmt.Sprintf("hash%d", i), +func TestIndexBasic(t *testing.T) { + forAllFixtures(t, func(t *testing.T, client chunk.StorageClient) { + // Write out 30 entries, into different hash and range values. + batch := client.NewWriteBatch() + for i := 0; i < 30; i++ { + batch.Add(tableName, fmt.Sprintf("hash%d", i), []byte(fmt.Sprintf("range%d", i)), nil) } - var have []chunk.IndexEntry - err := client.QueryPages(context.Background(), entry, func(read chunk.ReadBatch, lastPage bool) bool { - for j := 0; j < read.Len(); j++ { - have = append(have, chunk.IndexEntry{ - RangeValue: read.RangeValue(j), - }) - } - return !lastPage - }) + err := client.BatchWrite(context.Background(), batch) require.NoError(t, err) - require.Equal(t, []chunk.IndexEntry{ - {RangeValue: []byte(fmt.Sprintf("range%d", i))}, - }, have) - } + + // Make sure we get back the correct entries by hash value. + for i := 0; i < 30; i++ { + entry := chunk.IndexQuery{ + TableName: tableName, + HashValue: fmt.Sprintf("hash%d", i), + } + var have []chunk.IndexEntry + err := client.QueryPages(context.Background(), entry, func(read chunk.ReadBatch, lastPage bool) bool { + for j := 0; j < read.Len(); j++ { + have = append(have, chunk.IndexEntry{ + RangeValue: read.RangeValue(j), + }) + } + return !lastPage + }) + require.NoError(t, err) + require.Equal(t, []chunk.IndexEntry{ + {RangeValue: []byte(fmt.Sprintf("range%d", i))}, + }, have) + } + }) +} + +var entries = []chunk.IndexEntry{ + { + TableName: "table", + HashValue: "foo", + RangeValue: []byte("bar:1"), + Value: []byte("10"), + }, + { + TableName: "table", + HashValue: "foo", + RangeValue: []byte("bar:2"), + Value: []byte("20"), + }, + { + TableName: "table", + HashValue: "foo", + RangeValue: []byte("bar:3"), + Value: []byte("30"), + }, + { + TableName: "table", + HashValue: "foo", + RangeValue: []byte("baz:1"), + Value: []byte("10"), + }, + { + TableName: "table", + HashValue: "foo", + RangeValue: []byte("baz:2"), + Value: []byte("20"), + }, + { + TableName: "table", + HashValue: "flip", + RangeValue: []byte("bar:1"), + Value: []byte("abc"), + }, + { + TableName: "table", + HashValue: "flip", + RangeValue: []byte("bar:2"), + Value: []byte("abc"), + }, + { + TableName: "table", + HashValue: "flip", + RangeValue: []byte("bar:3"), + Value: []byte("abc"), + }, } -func TestAWSStorageClientQueryPages(t *testing.T) { - entries := []chunk.IndexEntry{ - { - TableName: "table", - HashValue: "foo", - RangeValue: []byte("bar:1"), - Value: []byte("10"), - }, - { - TableName: "table", - HashValue: "foo", - RangeValue: []byte("bar:2"), - Value: []byte("20"), - }, - { - TableName: "table", - HashValue: "foo", - RangeValue: []byte("bar:3"), - Value: []byte("30"), - }, - { - TableName: "table", - HashValue: "foo", - RangeValue: []byte("baz:1"), - Value: []byte("10"), - }, - { - TableName: "table", - HashValue: "foo", - RangeValue: []byte("baz:2"), - Value: []byte("20"), - }, - { - TableName: "table", - HashValue: "flip", - RangeValue: []byte("bar:1"), - Value: []byte("abc"), - }, - { - TableName: "table", - HashValue: "flip", - RangeValue: []byte("bar:2"), - Value: []byte("abc"), - }, - { - TableName: "table", - HashValue: "flip", - RangeValue: []byte("bar:3"), - Value: []byte("abc"), - }, - } +func TestQueryPages(t *testing.T) { + forAllFixtures(t, func(t *testing.T, client chunk.StorageClient) { + batch := client.NewWriteBatch() + for _, entry := range entries { + batch.Add(entry.TableName, entry.HashValue, entry.RangeValue, entry.Value) + } - tests := []struct { - name string - query chunk.IndexQuery - provisionedErr int - want []chunk.IndexEntry - }{ - { - "check HashValue only", - chunk.IndexQuery{ - TableName: "table", - HashValue: "flip", + err := client.BatchWrite(context.Background(), batch) + require.NoError(t, err) + + tests := []struct { + name string + query chunk.IndexQuery + provisionedErr int + want []chunk.IndexEntry + }{ + { + "check HashValue only", + chunk.IndexQuery{ + TableName: "table", + HashValue: "flip", + }, + 0, + []chunk.IndexEntry{entries[5], entries[6], entries[7]}, }, - 0, - []chunk.IndexEntry{entries[5], entries[6], entries[7]}, - }, - { - "check RangeValueStart", - chunk.IndexQuery{ - TableName: "table", - HashValue: "foo", - RangeValueStart: []byte("bar:2"), + { + "check RangeValueStart", + chunk.IndexQuery{ + TableName: "table", + HashValue: "foo", + RangeValueStart: []byte("bar:2"), + }, + 0, + []chunk.IndexEntry{entries[1], entries[2], entries[3], entries[4]}, }, - 0, - []chunk.IndexEntry{entries[1], entries[2], entries[3], entries[4]}, - }, - { - "check RangeValuePrefix", - chunk.IndexQuery{ - TableName: "table", - HashValue: "foo", - RangeValuePrefix: []byte("baz:"), + { + "check RangeValuePrefix", + chunk.IndexQuery{ + TableName: "table", + HashValue: "foo", + RangeValuePrefix: []byte("baz:"), + }, + 0, + []chunk.IndexEntry{entries[3], entries[4]}, }, - 0, - []chunk.IndexEntry{entries[3], entries[4]}, - }, - { - "check ValueEqual", - chunk.IndexQuery{ - TableName: "table", - HashValue: "foo", - RangeValuePrefix: []byte("bar"), - ValueEqual: []byte("20"), + { + "check ValueEqual", + chunk.IndexQuery{ + TableName: "table", + HashValue: "foo", + RangeValuePrefix: []byte("bar"), + ValueEqual: []byte("20"), + }, + 0, + []chunk.IndexEntry{entries[1]}, }, - 0, - []chunk.IndexEntry{entries[1]}, - }, - { - "check retry logic", - chunk.IndexQuery{ - TableName: "table", - HashValue: "foo", - RangeValuePrefix: []byte("bar"), - ValueEqual: []byte("20"), + { + "check retry logic", + chunk.IndexQuery{ + TableName: "table", + HashValue: "foo", + RangeValuePrefix: []byte("bar"), + ValueEqual: []byte("20"), + }, + 2, + []chunk.IndexEntry{entries[1]}, }, - 2, - []chunk.IndexEntry{entries[1]}, - }, - } - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - dynamoDB := newMockDynamoDB(0, tt.provisionedErr) - client := storageClient{ - DynamoDB: dynamoDB, - queryRequestFn: dynamoDB.queryRequest, - batchGetItemRequestFn: dynamoDB.batchGetItemRequest, - batchWriteItemRequestFn: dynamoDB.batchWriteItemRequest, - } - - batch := client.NewWriteBatch() - for _, entry := range entries { - batch.Add(entry.TableName, entry.HashValue, entry.RangeValue, entry.Value) - } - dynamoDB.createTable("table") - - err := client.BatchWrite(context.Background(), batch) - require.NoError(t, err) + } - var have []chunk.IndexEntry - err = client.QueryPages(context.Background(), tt.query, func(read chunk.ReadBatch, lastPage bool) bool { - for i := 0; i < read.Len(); i++ { - have = append(have, chunk.IndexEntry{ - TableName: tt.query.TableName, - HashValue: tt.query.HashValue, - RangeValue: read.RangeValue(i), - Value: read.Value(i), - }) - } - return !lastPage + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + var have []chunk.IndexEntry + err = client.QueryPages(context.Background(), tt.query, func(read chunk.ReadBatch, lastPage bool) bool { + for i := 0; i < read.Len(); i++ { + have = append(have, chunk.IndexEntry{ + TableName: tt.query.TableName, + HashValue: tt.query.HashValue, + RangeValue: read.RangeValue(i), + Value: read.Value(i), + }) + } + return !lastPage + }) + require.NoError(t, err) + require.Equal(t, tt.want, have) }) - require.NoError(t, err) - require.Equal(t, tt.want, have) - }) - } + } + }) } diff --git a/pkg/chunk/storage/storage_client_test.go b/pkg/chunk/storage/storage_client_test.go index 7691ae41aec..34457ff992f 100644 --- a/pkg/chunk/storage/storage_client_test.go +++ b/pkg/chunk/storage/storage_client_test.go @@ -12,106 +12,57 @@ import ( "github.com/stretchr/testify/require" "github.com/weaveworks/cortex/pkg/chunk" - "github.com/weaveworks/cortex/pkg/chunk/aws" - "github.com/weaveworks/cortex/pkg/chunk/gcp" - promchunk "github.com/weaveworks/cortex/pkg/prom1/storage/local/chunk" ) -var fixtures = append(aws.Fixtures, gcp.Fixtures...) - -func TestStoreChunks(t *testing.T) { - for _, fixture := range fixtures { - t.Run(fixture.Name(), func(t *testing.T) { - storageClient, tableClient, schemaConfig, err := fixture.Clients() - require.NoError(t, err) - defer fixture.Teardown() - - tableManager, err := chunk.NewTableManager(schemaConfig, 12*time.Hour, tableClient) - require.NoError(t, err) - - err = tableManager.SyncTables(context.Background()) - require.NoError(t, err) - - testStorageClientChunks(t, storageClient) - }) - } -} - -func testStorageClientChunks(t *testing.T, client chunk.StorageClient) { - const batchSize = 50 - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) - defer cancel() +func TestChunksBasic(t *testing.T) { + forAllFixtures(t, func(t *testing.T, client chunk.StorageClient) { + const batchSize = 50 + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + // Write a few batches of chunks. + written := []string{} + for i := 0; i < 50; i++ { + chunks := []chunk.Chunk{} + for j := 0; j < batchSize; j++ { + chunk := dummyChunkFor(model.Now(), model.Metric{ + model.MetricNameLabel: "foo", + "index": model.LabelValue(strconv.Itoa(i*batchSize + j)), + }) + chunks = append(chunks, chunk) + _, err := chunk.Encode() // Need to encode it, side effect calculates crc + require.NoError(t, err) + written = append(written, chunk.ExternalKey()) + } - // Write a few batches of chunks. - written := []string{} - for i := 0; i < 50; i++ { - chunks := []chunk.Chunk{} - for j := 0; j < batchSize; j++ { - chunk := dummyChunkFor(model.Now(), model.Metric{ - model.MetricNameLabel: "foo", - "index": model.LabelValue(strconv.Itoa(i*batchSize + j)), - }) - chunks = append(chunks, chunk) - _, err := chunk.Encode() // Need to encode it, side effect calculates crc + err := client.PutChunks(ctx, chunks) require.NoError(t, err) - written = append(written, chunk.ExternalKey()) } - err := client.PutChunks(ctx, chunks) - require.NoError(t, err) - } - - // Get a few batches of chunks. - for i := 0; i < 50; i++ { - keysToGet := map[string]struct{}{} - chunksToGet := []chunk.Chunk{} - for len(chunksToGet) < batchSize { - key := written[rand.Intn(len(written))] - if _, ok := keysToGet[key]; ok { - continue + // Get a few batches of chunks. + for i := 0; i < 50; i++ { + keysToGet := map[string]struct{}{} + chunksToGet := []chunk.Chunk{} + for len(chunksToGet) < batchSize { + key := written[rand.Intn(len(written))] + if _, ok := keysToGet[key]; ok { + continue + } + keysToGet[key] = struct{}{} + chunk, err := chunk.ParseExternalKey(userID, key) + require.NoError(t, err) + chunksToGet = append(chunksToGet, chunk) } - keysToGet[key] = struct{}{} - chunk, err := chunk.ParseExternalKey(userID, key) - require.NoError(t, err) - chunksToGet = append(chunksToGet, chunk) - } - chunksWeGot, err := client.GetChunks(ctx, chunksToGet) - require.NoError(t, err) - require.Equal(t, len(chunksToGet), len(chunksWeGot)) + chunksWeGot, err := client.GetChunks(ctx, chunksToGet) + require.NoError(t, err) + require.Equal(t, len(chunksToGet), len(chunksWeGot)) - sort.Sort(chunk.ByKey(chunksToGet)) - sort.Sort(chunk.ByKey(chunksWeGot)) - for j := 0; j < len(chunksWeGot); j++ { - require.Equal(t, chunksToGet[i].ExternalKey(), chunksWeGot[i].ExternalKey(), strconv.Itoa(i)) + sort.Sort(chunk.ByKey(chunksToGet)) + sort.Sort(chunk.ByKey(chunksWeGot)) + for j := 0; j < len(chunksWeGot); j++ { + require.Equal(t, chunksToGet[i].ExternalKey(), chunksWeGot[i].ExternalKey(), strconv.Itoa(i)) + } } - } -} - -const userID = "userID" - -func dummyChunk(now model.Time) chunk.Chunk { - return dummyChunkFor(now, model.Metric{ - model.MetricNameLabel: "foo", - "bar": "baz", - "toms": "code", }) } - -func dummyChunkFor(now model.Time, metric model.Metric) chunk.Chunk { - cs, _ := promchunk.New().Add(model.SamplePair{Timestamp: now, Value: 0}) - chunk := chunk.NewChunk( - userID, - metric.Fingerprint(), - metric, - cs[0], - now.Add(-time.Hour), - now, - ) - // Force checksum calculation. - _, err := chunk.Encode() - if err != nil { - panic(err) - } - return chunk -} diff --git a/pkg/chunk/storage/utils_test.go b/pkg/chunk/storage/utils_test.go new file mode 100644 index 00000000000..6147e93c872 --- /dev/null +++ b/pkg/chunk/storage/utils_test.go @@ -0,0 +1,72 @@ +package storage + +import ( + "context" + "testing" + "time" + + "github.com/prometheus/common/model" + "github.com/stretchr/testify/require" + "github.com/weaveworks/cortex/pkg/chunk" + "github.com/weaveworks/cortex/pkg/chunk/aws" + "github.com/weaveworks/cortex/pkg/chunk/gcp" + promchunk "github.com/weaveworks/cortex/pkg/prom1/storage/local/chunk" +) + +const ( + userID = "userID" + tableName = "table" +) + +var fixtures = append(aws.Fixtures, gcp.Fixtures...) + +type storageClientTest func(*testing.T, chunk.StorageClient) + +func forAllFixtures(t *testing.T, storageClientTest storageClientTest) { + for _, fixture := range fixtures { + t.Run(fixture.Name(), func(t *testing.T) { + storageClient, tableClient, schemaConfig, err := fixture.Clients() + require.NoError(t, err) + defer fixture.Teardown() + + tableManager, err := chunk.NewTableManager(schemaConfig, 12*time.Hour, tableClient) + require.NoError(t, err) + + err = tableManager.SyncTables(context.Background()) + require.NoError(t, err) + + err = tableClient.CreateTable(context.Background(), chunk.TableDesc{ + Name: tableName, + }) + require.NoError(t, err) + + storageClientTest(t, storageClient) + }) + } +} + +func dummyChunk(now model.Time) chunk.Chunk { + return dummyChunkFor(now, model.Metric{ + model.MetricNameLabel: "foo", + "bar": "baz", + "toms": "code", + }) +} + +func dummyChunkFor(now model.Time, metric model.Metric) chunk.Chunk { + cs, _ := promchunk.New().Add(model.SamplePair{Timestamp: now, Value: 0}) + chunk := chunk.NewChunk( + userID, + metric.Fingerprint(), + metric, + cs[0], + now.Add(-time.Hour), + now, + ) + // Force checksum calculation. + _, err := chunk.Encode() + if err != nil { + panic(err) + } + return chunk +} From c608d8844a28acdcdadbfba9c263bdcba71f99c9 Mon Sep 17 00:00:00 2001 From: Tom Wilkie Date: Tue, 3 Apr 2018 17:54:09 +0100 Subject: [PATCH 3/6] Make BigTable storage pass tests: Implement client-side filtering of results; server doesn't offer this facility in a consumable fashion. --- pkg/chunk/gcp/storage_client.go | 19 +++++++++++++++++-- 1 file changed, 17 insertions(+), 2 deletions(-) diff --git a/pkg/chunk/gcp/storage_client.go b/pkg/chunk/gcp/storage_client.go index d3666fc474a..a73ba6176b5 100644 --- a/pkg/chunk/gcp/storage_client.go +++ b/pkg/chunk/gcp/storage_client.go @@ -1,6 +1,7 @@ package gcp import ( + "bytes" "context" "flag" "fmt" @@ -116,6 +117,17 @@ func (s *storageClient) QueryPages(ctx context.Context, query chunk.IndexQuery, table := s.client.Open(query.TableName) var rowRange bigtable.RowRange + + /* BigTable only seems to support regex match on cell values, so doing it + client side for now + readOpts := []bigtable.ReadOption{ + bigtable.RowFilter(bigtable.FamilyFilter(columnFamily)), + } + if query.ValueEqual != nil { + readOpts = append(readOpts, bigtable.RowFilter(bigtable.ValueFilter(string(query.ValueEqual)))) + } + */ + if len(query.RangeValuePrefix) > 0 { rowRange = bigtable.PrefixRange(query.HashValue + separator + string(query.RangeValuePrefix)) } else if len(query.RangeValueStart) > 0 { @@ -125,8 +137,11 @@ func (s *storageClient) QueryPages(ctx context.Context, query chunk.IndexQuery, } err := table.ReadRows(ctx, rowRange, func(r bigtable.Row) bool { - return callback(bigtableReadBatch(r), false) - }, bigtable.RowFilter(bigtable.FamilyFilter(columnFamily))) + if query.ValueEqual == nil || bytes.Equal(r[columnFamily][0].Value, query.ValueEqual) { + return callback(bigtableReadBatch(r), false) + } + return true + }) if err != nil { sp.LogFields(otlog.String("error", err.Error())) return errors.WithStack(err) From eec3d90bd378c2cc135f8df18334b63068420474 Mon Sep 17 00:00:00 2001 From: Tom Wilkie Date: Mon, 2 Apr 2018 17:00:26 +0100 Subject: [PATCH 4/6] Don't pass QueryPage callback lastPage; its a horrible interface, and causes no ends of issues. Instead, just have the DynamoDB implementation work it out for itself. --- pkg/chunk/aws/storage_client.go | 7 +++++-- pkg/chunk/cassandra/storage_client.go | 4 ++-- pkg/chunk/chunk_store.go | 4 ++-- pkg/chunk/gcp/storage_client.go | 4 ++-- pkg/chunk/inmemory_storage_client.go | 4 ++-- pkg/chunk/storage/index_test.go | 8 ++++---- pkg/chunk/storage_client.go | 2 +- 7 files changed, 18 insertions(+), 15 deletions(-) diff --git a/pkg/chunk/aws/storage_client.go b/pkg/chunk/aws/storage_client.go index 1c8138931d5..b633104e5f4 100644 --- a/pkg/chunk/aws/storage_client.go +++ b/pkg/chunk/aws/storage_client.go @@ -275,7 +275,7 @@ func (a storageClient) BatchWrite(ctx context.Context, input chunk.WriteBatch) e return backoff.Err() } -func (a storageClient) QueryPages(ctx context.Context, query chunk.IndexQuery, callback func(result chunk.ReadBatch, lastPage bool) (shouldContinue bool)) error { +func (a storageClient) QueryPages(ctx context.Context, query chunk.IndexQuery, callback func(result chunk.ReadBatch) (shouldContinue bool)) error { sp, ctx := ot.StartSpanFromContext(ctx, "QueryPages", ot.Tag{Key: "tableName", Value: query.TableName}, ot.Tag{Key: "hashValue", Value: query.HashValue}) defer sp.Finish() @@ -332,12 +332,15 @@ func (a storageClient) QueryPages(ctx context.Context, query chunk.IndexQuery, c return err } - if getNextPage := callback(response, !page.HasNextPage()); !getNextPage { + if !callback(response) { if err != nil { return fmt.Errorf("QueryPages error: table=%v, err=%v", *input.TableName, page.Error()) } return nil } + if !page.HasNextPage() { + return nil + } } return nil } diff --git a/pkg/chunk/cassandra/storage_client.go b/pkg/chunk/cassandra/storage_client.go index fe180b856ea..73079a1ce12 100644 --- a/pkg/chunk/cassandra/storage_client.go +++ b/pkg/chunk/cassandra/storage_client.go @@ -116,7 +116,7 @@ func (s *storageClient) BatchWrite(ctx context.Context, batch chunk.WriteBatch) return s.session.ExecuteBatch(cassandraBatch.b.WithContext(ctx)) } -func (s *storageClient) QueryPages(ctx context.Context, query chunk.IndexQuery, callback func(result chunk.ReadBatch, lastPage bool) (shouldContinue bool)) error { +func (s *storageClient) QueryPages(ctx context.Context, query chunk.IndexQuery, callback func(result chunk.ReadBatch) (shouldContinue bool)) error { var q *gocql.Query if len(query.RangeValuePrefix) > 0 { q = s.session.Query(fmt.Sprintf("SELECT range, value FROM %s WHERE hash = ? AND range >= ?", query.TableName), @@ -137,7 +137,7 @@ func (s *storageClient) QueryPages(ctx context.Context, query chunk.IndexQuery, if err := scanner.Scan(&b.rangeValue, &b.value); err != nil { return err } - if callback(b, false) { + if callback(b) { return nil } } diff --git a/pkg/chunk/chunk_store.go b/pkg/chunk/chunk_store.go index 6dedf14d4b0..d277c9c9522 100644 --- a/pkg/chunk/chunk_store.go +++ b/pkg/chunk/chunk_store.go @@ -466,7 +466,7 @@ func (c *Store) lookupEntriesByQueries(ctx context.Context, queries []IndexQuery func (c *Store) lookupEntriesByQuery(ctx context.Context, query IndexQuery) ([]IndexEntry, error) { var entries []IndexEntry - if err := c.storage.QueryPages(ctx, query, func(resp ReadBatch, lastPage bool) (shouldContinue bool) { + if err := c.storage.QueryPages(ctx, query, func(resp ReadBatch) (shouldContinue bool) { for i := 0; i < resp.Len(); i++ { entries = append(entries, IndexEntry{ TableName: query.TableName, @@ -475,7 +475,7 @@ func (c *Store) lookupEntriesByQuery(ctx context.Context, query IndexQuery) ([]I Value: resp.Value(i), }) } - return !lastPage + return true }); err != nil { level.Error(util.WithContext(ctx, util.Logger)).Log("msg", "error querying storage", "err", err) return nil, err diff --git a/pkg/chunk/gcp/storage_client.go b/pkg/chunk/gcp/storage_client.go index a73ba6176b5..b2d256b760a 100644 --- a/pkg/chunk/gcp/storage_client.go +++ b/pkg/chunk/gcp/storage_client.go @@ -110,7 +110,7 @@ func (s *storageClient) BatchWrite(ctx context.Context, batch chunk.WriteBatch) return nil } -func (s *storageClient) QueryPages(ctx context.Context, query chunk.IndexQuery, callback func(result chunk.ReadBatch, lastPage bool) (shouldContinue bool)) error { +func (s *storageClient) QueryPages(ctx context.Context, query chunk.IndexQuery, callback func(result chunk.ReadBatch) (shouldContinue bool)) error { sp, ctx := ot.StartSpanFromContext(ctx, "QueryPages", ot.Tag{Key: "tableName", Value: query.TableName}, ot.Tag{Key: "hashValue", Value: query.HashValue}) defer sp.Finish() @@ -138,7 +138,7 @@ func (s *storageClient) QueryPages(ctx context.Context, query chunk.IndexQuery, err := table.ReadRows(ctx, rowRange, func(r bigtable.Row) bool { if query.ValueEqual == nil || bytes.Equal(r[columnFamily][0].Value, query.ValueEqual) { - return callback(bigtableReadBatch(r), false) + return callback(bigtableReadBatch(r)) } return true }) diff --git a/pkg/chunk/inmemory_storage_client.go b/pkg/chunk/inmemory_storage_client.go index 443c9f12b08..3dea4728db9 100644 --- a/pkg/chunk/inmemory_storage_client.go +++ b/pkg/chunk/inmemory_storage_client.go @@ -158,7 +158,7 @@ func (m *MockStorage) BatchWrite(ctx context.Context, batch WriteBatch) error { } // QueryPages implements StorageClient. -func (m *MockStorage) QueryPages(ctx context.Context, query IndexQuery, callback func(result ReadBatch, lastPage bool) (shouldContinue bool)) error { +func (m *MockStorage) QueryPages(ctx context.Context, query IndexQuery, callback func(result ReadBatch) (shouldContinue bool)) error { logger := util.WithContext(ctx, util.Logger) m.mtx.RLock() defer m.mtx.RUnlock() @@ -232,7 +232,7 @@ func (m *MockStorage) QueryPages(ctx context.Context, query IndexQuery, callback result = append(result, item) } - callback(result, true) + callback(result) return nil } diff --git a/pkg/chunk/storage/index_test.go b/pkg/chunk/storage/index_test.go index a9117b44f14..9c311bb6590 100644 --- a/pkg/chunk/storage/index_test.go +++ b/pkg/chunk/storage/index_test.go @@ -26,13 +26,13 @@ func TestIndexBasic(t *testing.T) { HashValue: fmt.Sprintf("hash%d", i), } var have []chunk.IndexEntry - err := client.QueryPages(context.Background(), entry, func(read chunk.ReadBatch, lastPage bool) bool { + err := client.QueryPages(context.Background(), entry, func(read chunk.ReadBatch) bool { for j := 0; j < read.Len(); j++ { have = append(have, chunk.IndexEntry{ RangeValue: read.RangeValue(j), }) } - return !lastPage + return true }) require.NoError(t, err) require.Equal(t, []chunk.IndexEntry{ @@ -165,7 +165,7 @@ func TestQueryPages(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { var have []chunk.IndexEntry - err = client.QueryPages(context.Background(), tt.query, func(read chunk.ReadBatch, lastPage bool) bool { + err = client.QueryPages(context.Background(), tt.query, func(read chunk.ReadBatch) bool { for i := 0; i < read.Len(); i++ { have = append(have, chunk.IndexEntry{ TableName: tt.query.TableName, @@ -174,7 +174,7 @@ func TestQueryPages(t *testing.T) { Value: read.Value(i), }) } - return !lastPage + return true }) require.NoError(t, err) require.Equal(t, tt.want, have) diff --git a/pkg/chunk/storage_client.go b/pkg/chunk/storage_client.go index 7ba0eace983..662fe62afe1 100644 --- a/pkg/chunk/storage_client.go +++ b/pkg/chunk/storage_client.go @@ -9,7 +9,7 @@ type StorageClient interface { BatchWrite(context.Context, WriteBatch) error // For the read path. - QueryPages(ctx context.Context, query IndexQuery, callback func(result ReadBatch, lastPage bool) (shouldContinue bool)) error + QueryPages(ctx context.Context, query IndexQuery, callback func(result ReadBatch) (shouldContinue bool)) error // For storing and retrieving chunks. PutChunks(ctx context.Context, chunks []Chunk) error From 71e99a556e9f56978a9442330a5be0b492576fc0 Mon Sep 17 00:00:00 2001 From: Tom Wilkie Date: Sun, 1 Apr 2018 14:50:15 +0100 Subject: [PATCH 5/6] Add Cassandra fixture. Doesn't run by default, as it depends on an external Cassandra cluster. See pkg/chunk/cassandra/fixtures.go for details on how to run. --- pkg/chunk/cassandra/fixtures.go | 80 +++++++++++++++++++++++++++++++++ pkg/chunk/storage/utils_test.go | 9 +++- 2 files changed, 87 insertions(+), 2 deletions(-) create mode 100644 pkg/chunk/cassandra/fixtures.go diff --git a/pkg/chunk/cassandra/fixtures.go b/pkg/chunk/cassandra/fixtures.go new file mode 100644 index 00000000000..a70132b6072 --- /dev/null +++ b/pkg/chunk/cassandra/fixtures.go @@ -0,0 +1,80 @@ +package cassandra + +import ( + "context" + "flag" + "os" + + "github.com/prometheus/common/model" + "github.com/weaveworks/cortex/pkg/chunk" + "github.com/weaveworks/cortex/pkg/util" +) + +// GOCQL doesn't provide nice mocks, so we use a real Cassandra instance. +// To enable these tests: +// $ docker run --name cassandra --rm -p 9042:9042 cassandra:3.11 +// $ CASSANDRA_TEST_ADDRESSES=localhost:9042 go test ./pkg/chunk/storage + +type fixture struct { + name string + storageClient chunk.StorageClient + tableClient chunk.TableClient + schemaConfig chunk.SchemaConfig +} + +func (f fixture) Name() string { + return f.name +} + +func (f fixture) Clients() (chunk.StorageClient, chunk.TableClient, chunk.SchemaConfig, error) { + return f.storageClient, f.tableClient, f.schemaConfig, nil +} + +func (f fixture) Teardown() error { + return nil +} + +// Fixtures for unit testing Cassandra integration. +func Fixtures() ([]chunk.Fixture, error) { + addresses := os.Getenv("CASSANDRA_TEST_ADDRESSES") + if addresses == "" { + return nil, nil + } + + cfg := Config{ + addresses: addresses, + keyspace: "test", + consistency: "QUORUM", + replicationFactor: 1, + } + + // Get a SchemaConfig with the defaults. + flagSet := flag.NewFlagSet("flags", flag.PanicOnError) + schemaConfig := chunk.SchemaConfig{} + schemaConfig.RegisterFlags(flagSet) + err := flagSet.Parse([]string{}) + if err != nil { + return nil, err + } + schemaConfig.IndexTables.From = util.NewDayValue(model.Now()) + schemaConfig.ChunkTables.From = util.NewDayValue(model.Now()) + + storageClient, err := NewStorageClient(cfg, schemaConfig) + if err != nil { + return nil, err + } + + tableClient, err := NewTableClient(context.Background(), cfg) + if err != nil { + return nil, err + } + + return []chunk.Fixture{ + fixture{ + name: "Cassandra", + storageClient: storageClient, + tableClient: tableClient, + schemaConfig: schemaConfig, + }, + }, nil +} diff --git a/pkg/chunk/storage/utils_test.go b/pkg/chunk/storage/utils_test.go index 6147e93c872..3cfb9e7ba78 100644 --- a/pkg/chunk/storage/utils_test.go +++ b/pkg/chunk/storage/utils_test.go @@ -9,6 +9,7 @@ import ( "github.com/stretchr/testify/require" "github.com/weaveworks/cortex/pkg/chunk" "github.com/weaveworks/cortex/pkg/chunk/aws" + "github.com/weaveworks/cortex/pkg/chunk/cassandra" "github.com/weaveworks/cortex/pkg/chunk/gcp" promchunk "github.com/weaveworks/cortex/pkg/prom1/storage/local/chunk" ) @@ -18,11 +19,15 @@ const ( tableName = "table" ) -var fixtures = append(aws.Fixtures, gcp.Fixtures...) - type storageClientTest func(*testing.T, chunk.StorageClient) func forAllFixtures(t *testing.T, storageClientTest storageClientTest) { + fixtures := append(aws.Fixtures, gcp.Fixtures...) + + cassandraFixtures, err := cassandra.Fixtures() + require.NoError(t, err) + fixtures = append(fixtures, cassandraFixtures...) + for _, fixture := range fixtures { t.Run(fixture.Name(), func(t *testing.T) { storageClient, tableClient, schemaConfig, err := fixture.Clients() From 87578f02856c0bff254d1ac1c13af70c8add8e55 Mon Sep 17 00:00:00 2001 From: Tom Wilkie Date: Tue, 3 Apr 2018 17:38:54 +0100 Subject: [PATCH 6/6] Make Cassandra pass the tests. - Add errors.WithStack on all error coming back from Cassandra. - Implement server-side value filtering. - Correctly implement prefix queries. - Don't call the test table "table", as this is a reserved word in CQL. - Don't use Cassandra batches for writes, they don't do what you think they do. - Correctly honor QueryPage callback value <--- this was the cause of the bug I was chasing... --- pkg/chunk/cassandra/storage_client.go | 97 ++++++++++++++++++--------- pkg/chunk/cassandra/table_client.go | 8 ++- pkg/chunk/storage/index_test.go | 26 +++---- pkg/chunk/storage/utils_test.go | 2 +- 4 files changed, 84 insertions(+), 49 deletions(-) diff --git a/pkg/chunk/cassandra/storage_client.go b/pkg/chunk/cassandra/storage_client.go index 73079a1ce12..e24c5d09d87 100644 --- a/pkg/chunk/cassandra/storage_client.go +++ b/pkg/chunk/cassandra/storage_client.go @@ -8,6 +8,7 @@ import ( "time" "github.com/gocql/gocql" + "github.com/pkg/errors" "github.com/weaveworks/cortex/pkg/chunk" ) @@ -35,11 +36,11 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { func (cfg *Config) session() (*gocql.Session, error) { consistency, err := gocql.ParseConsistencyWrapper(cfg.consistency) if err != nil { - return nil, err + return nil, errors.WithStack(err) } if err := cfg.createKeyspace(); err != nil { - return nil, err + return nil, errors.WithStack(err) } cluster := gocql.NewCluster(strings.Split(cfg.addresses, ",")...) @@ -58,17 +59,18 @@ func (cfg *Config) createKeyspace() error { cluster.Timeout = 20 * time.Second session, err := cluster.CreateSession() if err != nil { - return err + return errors.WithStack(err) } defer session.Close() - return session.Query(fmt.Sprintf( + err = session.Query(fmt.Sprintf( `CREATE KEYSPACE IF NOT EXISTS %s WITH replication = { 'class' : 'SimpleStrategy', 'replication_factor' : %d }`, cfg.keyspace, cfg.replicationFactor)).Exec() + return errors.WithStack(err) } // storageClient implements chunk.storageClient for GCP. @@ -82,7 +84,7 @@ type storageClient struct { func NewStorageClient(cfg Config, schemaCfg chunk.SchemaConfig) (chunk.StorageClient, error) { session, err := cfg.session() if err != nil { - return nil, err + return nil, errors.WithStack(err) } return &storageClient{ @@ -96,37 +98,66 @@ func (s *storageClient) Close() { s.session.Close() } +// Cassandra batching isn't really useful in this case, its more to do multiple +// atomic writes. Therefore we just do a bunch of writes in parallel. type writeBatch struct { - b *gocql.Batch + entries []chunk.IndexEntry } func (s *storageClient) NewWriteBatch() chunk.WriteBatch { - return writeBatch{ - b: s.session.NewBatch(gocql.UnloggedBatch), - } + return &writeBatch{} } -func (b writeBatch) Add(tableName, hashValue string, rangeValue []byte, value []byte) { - b.b.Query(fmt.Sprintf("INSERT INTO %s (hash, range, value) VALUES (?, ?, ?)", tableName), - hashValue, rangeValue, value) +func (b *writeBatch) Add(tableName, hashValue string, rangeValue []byte, value []byte) { + b.entries = append(b.entries, chunk.IndexEntry{ + TableName: tableName, + HashValue: hashValue, + RangeValue: rangeValue, + Value: value, + }) } func (s *storageClient) BatchWrite(ctx context.Context, batch chunk.WriteBatch) error { - cassandraBatch := batch.(writeBatch) - return s.session.ExecuteBatch(cassandraBatch.b.WithContext(ctx)) + b := batch.(*writeBatch) + + for _, entry := range b.entries { + err := s.session.Query(fmt.Sprintf("INSERT INTO %s (hash, range, value) VALUES (?, ?, ?)", + entry.TableName), entry.HashValue, entry.RangeValue, entry.Value).WithContext(ctx).Exec() + if err != nil { + return errors.WithStack(err) + } + } + + return nil } func (s *storageClient) QueryPages(ctx context.Context, query chunk.IndexQuery, callback func(result chunk.ReadBatch) (shouldContinue bool)) error { var q *gocql.Query - if len(query.RangeValuePrefix) > 0 { - q = s.session.Query(fmt.Sprintf("SELECT range, value FROM %s WHERE hash = ? AND range >= ?", query.TableName), - query.HashValue, query.RangeValuePrefix) - } else if len(query.RangeValueStart) > 0 { - q = s.session.Query(fmt.Sprintf("SELECT range, value FROM %s WHERE hash = ? AND range >= ?", query.TableName), - query.HashValue, query.RangeValueStart) - } else { - q = s.session.Query(fmt.Sprintf("SELECT range, value FROM %s WHERE hash = ?", query.TableName), - query.HashValue) + + switch { + case len(query.RangeValuePrefix) > 0 && query.ValueEqual == nil: + q = s.session.Query(fmt.Sprintf("SELECT range, value FROM %s WHERE hash = ? AND range >= ? AND range < ?", + query.TableName), query.HashValue, query.RangeValuePrefix, append(query.RangeValuePrefix, '\xff')) + + case len(query.RangeValuePrefix) > 0 && query.ValueEqual != nil: + q = s.session.Query(fmt.Sprintf("SELECT range, value FROM %s WHERE hash = ? AND range >= ? AND range < ? AND value = ? ALLOW FILTERING", + query.TableName), query.HashValue, query.RangeValuePrefix, append(query.RangeValuePrefix, '\xff'), query.ValueEqual) + + case len(query.RangeValueStart) > 0 && query.ValueEqual == nil: + q = s.session.Query(fmt.Sprintf("SELECT range, value FROM %s WHERE hash = ? AND range >= ?", + query.TableName), query.HashValue, query.RangeValueStart) + + case len(query.RangeValueStart) > 0 && query.ValueEqual != nil: + q = s.session.Query(fmt.Sprintf("SELECT range, value FROM %s WHERE hash = ? AND range >= ? AND value = ? ALLOW FILTERING", + query.TableName), query.HashValue, query.RangeValueStart, query.ValueEqual) + + case query.ValueEqual == nil: + q = s.session.Query(fmt.Sprintf("SELECT range, value FROM %s WHERE hash = ?", + query.TableName), query.HashValue) + + case query.ValueEqual != nil: + q = s.session.Query(fmt.Sprintf("SELECT range, value FROM %s WHERE hash = ? value = ? ALLOW FILTERING", + query.TableName), query.HashValue, query.ValueEqual) } iter := q.WithContext(ctx).Iter() @@ -135,13 +166,13 @@ func (s *storageClient) QueryPages(ctx context.Context, query chunk.IndexQuery, for scanner.Next() { var b readBatch if err := scanner.Scan(&b.rangeValue, &b.value); err != nil { - return err + return errors.WithStack(err) } - if callback(b) { + if !callback(b) { return nil } } - return scanner.Err() + return errors.WithStack(scanner.Err()) } // readBatch represents a batch of rows read from Cassandra. @@ -171,22 +202,24 @@ func (b readBatch) Value(index int) []byte { } func (s *storageClient) PutChunks(ctx context.Context, chunks []chunk.Chunk) error { - b := gocql.NewBatch(gocql.UnloggedBatch).WithContext(ctx) - for i := range chunks { // Encode the chunk first - checksum is calculated as a side effect. buf, err := chunks[i].Encode() if err != nil { - return err + return errors.WithStack(err) } key := chunks[i].ExternalKey() tableName := s.schemaCfg.ChunkTables.TableFor(chunks[i].From) // Must provide a range key, even though its not useds - hence 0x00. - b.Query(fmt.Sprintf("INSERT INTO %s (hash, range, value) VALUES (?, 0x00, ?)", tableName), key, buf) + q := s.session.Query(fmt.Sprintf("INSERT INTO %s (hash, range, value) VALUES (?, 0x00, ?)", + tableName), key, buf) + if err := q.WithContext(ctx).Exec(); err != nil { + return errors.WithStack(err) + } } - return s.session.ExecuteBatch(b) + return nil } func (s *storageClient) GetChunks(ctx context.Context, input []chunk.Chunk) ([]chunk.Chunk, error) { @@ -223,7 +256,7 @@ func (s *storageClient) getChunk(ctx context.Context, input chunk.Chunk) (chunk. var buf []byte if err := s.session.Query(fmt.Sprintf("SELECT value FROM %s WHERE hash = ?", tableName), input.ExternalKey()). WithContext(ctx).Scan(&buf); err != nil { - return input, err + return input, errors.WithStack(err) } decodeContext := chunk.NewDecodeContext() err := input.Decode(decodeContext, buf) diff --git a/pkg/chunk/cassandra/table_client.go b/pkg/chunk/cassandra/table_client.go index 3feafd0047a..45865d2bba8 100644 --- a/pkg/chunk/cassandra/table_client.go +++ b/pkg/chunk/cassandra/table_client.go @@ -6,6 +6,7 @@ import ( "github.com/aws/aws-sdk-go/service/dynamodb" "github.com/gocql/gocql" + "github.com/pkg/errors" "github.com/weaveworks/cortex/pkg/chunk" ) @@ -19,7 +20,7 @@ type tableClient struct { func NewTableClient(ctx context.Context, cfg Config) (chunk.TableClient, error) { session, err := cfg.session() if err != nil { - return nil, err + return nil, errors.WithStack(err) } return &tableClient{ cfg: cfg, @@ -30,7 +31,7 @@ func NewTableClient(ctx context.Context, cfg Config) (chunk.TableClient, error) func (c *tableClient) ListTables(ctx context.Context) ([]string, error) { md, err := c.session.KeyspaceMetadata(c.cfg.keyspace) if err != nil { - return nil, err + return nil, errors.WithStack(err) } result := []string{} for name := range md.Tables { @@ -40,13 +41,14 @@ func (c *tableClient) ListTables(ctx context.Context) ([]string, error) { } func (c *tableClient) CreateTable(ctx context.Context, desc chunk.TableDesc) error { - return c.session.Query(fmt.Sprintf(` + err := c.session.Query(fmt.Sprintf(` CREATE TABLE IF NOT EXISTS %s ( hash text, range blob, value blob, PRIMARY KEY (hash, range) )`, desc.Name)).WithContext(ctx).Exec() + return errors.WithStack(err) } func (c *tableClient) DescribeTable(ctx context.Context, name string) (desc chunk.TableDesc, status string, err error) { diff --git a/pkg/chunk/storage/index_test.go b/pkg/chunk/storage/index_test.go index 9c311bb6590..b6cdc0f172c 100644 --- a/pkg/chunk/storage/index_test.go +++ b/pkg/chunk/storage/index_test.go @@ -44,49 +44,49 @@ func TestIndexBasic(t *testing.T) { var entries = []chunk.IndexEntry{ { - TableName: "table", + TableName: tableName, HashValue: "foo", RangeValue: []byte("bar:1"), Value: []byte("10"), }, { - TableName: "table", + TableName: tableName, HashValue: "foo", RangeValue: []byte("bar:2"), Value: []byte("20"), }, { - TableName: "table", + TableName: tableName, HashValue: "foo", RangeValue: []byte("bar:3"), Value: []byte("30"), }, { - TableName: "table", + TableName: tableName, HashValue: "foo", RangeValue: []byte("baz:1"), Value: []byte("10"), }, { - TableName: "table", + TableName: tableName, HashValue: "foo", RangeValue: []byte("baz:2"), Value: []byte("20"), }, { - TableName: "table", + TableName: tableName, HashValue: "flip", RangeValue: []byte("bar:1"), Value: []byte("abc"), }, { - TableName: "table", + TableName: tableName, HashValue: "flip", RangeValue: []byte("bar:2"), Value: []byte("abc"), }, { - TableName: "table", + TableName: tableName, HashValue: "flip", RangeValue: []byte("bar:3"), Value: []byte("abc"), @@ -112,7 +112,7 @@ func TestQueryPages(t *testing.T) { { "check HashValue only", chunk.IndexQuery{ - TableName: "table", + TableName: tableName, HashValue: "flip", }, 0, @@ -121,7 +121,7 @@ func TestQueryPages(t *testing.T) { { "check RangeValueStart", chunk.IndexQuery{ - TableName: "table", + TableName: tableName, HashValue: "foo", RangeValueStart: []byte("bar:2"), }, @@ -131,7 +131,7 @@ func TestQueryPages(t *testing.T) { { "check RangeValuePrefix", chunk.IndexQuery{ - TableName: "table", + TableName: tableName, HashValue: "foo", RangeValuePrefix: []byte("baz:"), }, @@ -141,7 +141,7 @@ func TestQueryPages(t *testing.T) { { "check ValueEqual", chunk.IndexQuery{ - TableName: "table", + TableName: tableName, HashValue: "foo", RangeValuePrefix: []byte("bar"), ValueEqual: []byte("20"), @@ -152,7 +152,7 @@ func TestQueryPages(t *testing.T) { { "check retry logic", chunk.IndexQuery{ - TableName: "table", + TableName: tableName, HashValue: "foo", RangeValuePrefix: []byte("bar"), ValueEqual: []byte("20"), diff --git a/pkg/chunk/storage/utils_test.go b/pkg/chunk/storage/utils_test.go index 3cfb9e7ba78..989a6222fc5 100644 --- a/pkg/chunk/storage/utils_test.go +++ b/pkg/chunk/storage/utils_test.go @@ -16,7 +16,7 @@ import ( const ( userID = "userID" - tableName = "table" + tableName = "test" ) type storageClientTest func(*testing.T, chunk.StorageClient)