From c792da8f416677a3e66f41d94036d8a9a5803c79 Mon Sep 17 00:00:00 2001 From: Goutham Veeramachaneni Date: Fri, 16 Mar 2018 13:54:54 +0530 Subject: [PATCH 1/3] Use BigTable Columns as range keys See https://github.com/weaveworks/cortex/issues/714 for motivation. One thing to notice is that unlike existing schemas, here the structure of the data being stored changed. This caused some backwards incompatible changes as the `StorageClient` interface doesn't have any time information in the `QueryPages` making it impossible to know if we need to range over the row-key or the columns to retrieve the data. For maintaining backwards compat, use `chunk.storage-client=gcpv1`. Signed-off-by: Goutham Veeramachaneni --- pkg/chunk/gcp/fixtures.go | 27 ++- pkg/chunk/gcp/storage_client.go | 218 ++++++++++++++++++----- pkg/chunk/storage/factory.go | 4 +- pkg/chunk/storage/storage_client_test.go | 147 +++++++++++++++ pkg/chunk/storage_client.go | 4 +- 5 files changed, 349 insertions(+), 51 deletions(-) diff --git a/pkg/chunk/gcp/fixtures.go b/pkg/chunk/gcp/fixtures.go index 00f8ead45e8..3e55ab06a05 100644 --- a/pkg/chunk/gcp/fixtures.go +++ b/pkg/chunk/gcp/fixtures.go @@ -21,6 +21,8 @@ const ( type fixture struct { srv *bttest.Server name string + + version int } func (f *fixture) Name() string { @@ -59,13 +61,23 @@ func (f *fixture) Clients() ( Prefix: "chunks", }, } - sClient = &storageClient{ - schemaCfg: schemaConfig, - client: client, - } tClient = &tableClient{ client: adminClient, } + + if f.version == 1 { + sClient = &storageClientV1{ + storageClientV2{ + schemaCfg: schemaConfig, + client: client, + }, + } + } else { + sClient = &storageClientV2{ + schemaCfg: schemaConfig, + client: client, + } + } return } @@ -77,6 +89,11 @@ func (f *fixture) Teardown() error { // Fixtures for unit testing GCP storage. var Fixtures = []chunk.Fixture{ &fixture{ - name: "GCP", + name: "GCPv2", + version: 2, + }, + &fixture{ + name: "GCPv1", + version: 1, }, } diff --git a/pkg/chunk/gcp/storage_client.go b/pkg/chunk/gcp/storage_client.go index d3666fc474a..922b7c3e777 100644 --- a/pkg/chunk/gcp/storage_client.go +++ b/pkg/chunk/gcp/storage_client.go @@ -34,8 +34,12 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { f.StringVar(&cfg.instance, "bigtable.instance", "", "Bigtable instance ID.") } -// storageClient implements chunk.storageClient for GCP. -type storageClient struct { +type storageClientV1 struct { + storageClientV2 +} + +// storageClientV2 implements chunk.storageClientV2 for GCP. +type storageClientV2 struct { cfg Config schemaCfg chunk.SchemaConfig client *bigtable.Client @@ -43,28 +47,48 @@ type storageClient struct { // NewStorageClient returns a new StorageClient. func NewStorageClient(ctx context.Context, cfg Config, schemaCfg chunk.SchemaConfig) (chunk.StorageClient, error) { + return NewStorageClientV2(ctx, cfg, schemaCfg) +} + +// NewStorageClientV1 returns a new v1 StorageClient. +func NewStorageClientV1(ctx context.Context, cfg Config, schemaCfg chunk.SchemaConfig) (chunk.StorageClient, error) { + client, err := bigtable.NewClient(ctx, cfg.project, cfg.instance, instrumentation()...) + if err != nil { + return nil, err + } + return &storageClientV1{ + storageClientV2{ + cfg: cfg, + schemaCfg: schemaCfg, + client: client, + }, + }, nil +} + +// NewStorageClientV2 returns a new v2 StorageClient. +func NewStorageClientV2(ctx context.Context, cfg Config, schemaCfg chunk.SchemaConfig) (chunk.StorageClient, error) { client, err := bigtable.NewClient(ctx, cfg.project, cfg.instance, instrumentation()...) if err != nil { return nil, err } - return &storageClient{ + return &storageClientV2{ cfg: cfg, schemaCfg: schemaCfg, client: client, }, nil } -func (s *storageClient) NewWriteBatch() chunk.WriteBatch { - return bigtableWriteBatch{ +func (s *storageClientV2) NewWriteBatch() chunk.WriteBatch { + return bigtableWriteBatchV2{ tables: map[string]map[string]*bigtable.Mutation{}, } } -type bigtableWriteBatch struct { +type bigtableWriteBatchV2 struct { tables map[string]map[string]*bigtable.Mutation } -func (b bigtableWriteBatch) Add(tableName, hashValue string, rangeValue []byte, value []byte) { +func (b bigtableWriteBatchV2) Add(tableName, hashValue string, rangeValue []byte, value []byte) { rows, ok := b.tables[tableName] if !ok { rows = map[string]*bigtable.Mutation{} @@ -73,18 +97,19 @@ func (b bigtableWriteBatch) Add(tableName, hashValue string, rangeValue []byte, // TODO the hashValue should actually be hashed - but I have data written in // this format, so we need to do a proper migration. - rowKey := hashValue + separator + string(rangeValue) + // TODO TODO ^ + rowKey := hashValue mutation, ok := rows[rowKey] if !ok { mutation = bigtable.NewMutation() rows[rowKey] = mutation } - mutation.Set(columnFamily, column, 0, value) + mutation.Set(columnFamily, string(rangeValue), 0, value) } -func (s *storageClient) BatchWrite(ctx context.Context, batch chunk.WriteBatch) error { - bigtableBatch := batch.(bigtableWriteBatch) +func (s *storageClientV2) BatchWrite(ctx context.Context, batch chunk.WriteBatch) error { + bigtableBatch := batch.(bigtableWriteBatchV2) for tableName, rows := range bigtableBatch.tables { table := s.client.Open(tableName) @@ -109,61 +134,57 @@ func (s *storageClient) BatchWrite(ctx context.Context, batch chunk.WriteBatch) return nil } -func (s *storageClient) QueryPages(ctx context.Context, query chunk.IndexQuery, callback func(result chunk.ReadBatch, lastPage bool) (shouldContinue bool)) error { +func (s *storageClientV2) QueryPages(ctx context.Context, query chunk.IndexQuery, callback func(result chunk.ReadBatch, lastPage bool) (shouldContinue bool)) error { sp, ctx := ot.StartSpanFromContext(ctx, "QueryPages", ot.Tag{Key: "tableName", Value: query.TableName}, ot.Tag{Key: "hashValue", Value: query.HashValue}) defer sp.Finish() table := s.client.Open(query.TableName) - var rowRange bigtable.RowRange + rOpts := []bigtable.ReadOption{ + bigtable.RowFilter(bigtable.FamilyFilter(columnFamily)), + } + if len(query.RangeValuePrefix) > 0 { - rowRange = bigtable.PrefixRange(query.HashValue + separator + string(query.RangeValuePrefix)) + rOpts = append(rOpts, bigtable.RowFilter(bigtable.ColumnFilter(string(query.RangeValuePrefix)+".*"))) // TODO: Check again and anchor. } else if len(query.RangeValueStart) > 0 { - rowRange = bigtable.NewRange(query.HashValue+separator+string(query.RangeValueStart), query.HashValue+separator+string('\xff')) - } else { - rowRange = bigtable.PrefixRange(query.HashValue + separator) + rOpts = append(rOpts, bigtable.RowFilter(bigtable.ColumnRangeFilter(columnFamily, string(query.RangeValueStart), ""))) } - err := table.ReadRows(ctx, rowRange, func(r bigtable.Row) bool { - return callback(bigtableReadBatch(r), false) - }, bigtable.RowFilter(bigtable.FamilyFilter(columnFamily))) + r, err := table.ReadRow(ctx, query.HashValue, rOpts...) if err != nil { sp.LogFields(otlog.String("error", err.Error())) return errors.WithStack(err) } + + val, ok := r[columnFamily] + if !ok { + panic("bad response from bigtable, columnFamily missing") + } + + for i := range val { + val[i].Column = strings.TrimPrefix(val[i].Column, columnFamily+":") + // TODO: Hacky hacky ^ + } + callback(bigtableReadBatchV2(val), true) // TODO: Check true or false. return nil } -// bigtableReadBatch represents a batch of rows read from Bigtable. As the -// bigtable interface gives us rows one-by-one, a batch always only contains -// a single row. -type bigtableReadBatch bigtable.Row +// bigtableReadBatchV2 represents a batch of values read from Bigtable. +type bigtableReadBatchV2 []bigtable.ReadItem -func (bigtableReadBatch) Len() int { - return 1 +func (b bigtableReadBatchV2) Len() int { + return len(b) } -func (b bigtableReadBatch) RangeValue(index int) []byte { - if index != 0 { - panic("index != 0") - } - // String before the first separator is the hashkey - parts := strings.SplitN(bigtable.Row(b).Key(), separator, 2) - return []byte(parts[1]) +func (b bigtableReadBatchV2) RangeValue(index int) []byte { + return []byte(b[index].Column) } -func (b bigtableReadBatch) Value(index int) []byte { - if index != 0 { - panic("index != 0") - } - cf, ok := b[columnFamily] - if !ok || len(cf) != 1 { - panic("bad response from bigtable") - } - return cf[0].Value +func (b bigtableReadBatchV2) Value(index int) []byte { + return b[index].Value } -func (s *storageClient) PutChunks(ctx context.Context, chunks []chunk.Chunk) error { +func (s *storageClientV2) PutChunks(ctx context.Context, chunks []chunk.Chunk) error { keys := map[string][]string{} muts := map[string][]*bigtable.Mutation{} @@ -197,7 +218,7 @@ func (s *storageClient) PutChunks(ctx context.Context, chunks []chunk.Chunk) err return nil } -func (s *storageClient) GetChunks(ctx context.Context, input []chunk.Chunk) ([]chunk.Chunk, error) { +func (s *storageClientV2) GetChunks(ctx context.Context, input []chunk.Chunk) ([]chunk.Chunk, error) { sp, ctx := ot.StartSpanFromContext(ctx, "GetChunks") defer sp.Finish() sp.LogFields(otlog.Int("chunks requested", len(input))) @@ -276,3 +297,112 @@ func (s *storageClient) GetChunks(ctx context.Context, input []chunk.Chunk) ([]c return output, nil } + +func (s *storageClientV1) NewWriteBatch() chunk.WriteBatch { + return bigtableWriteBatchV1{ + tables: map[string]map[string]*bigtable.Mutation{}, + } +} + +type bigtableWriteBatchV1 struct { + tables map[string]map[string]*bigtable.Mutation +} + +func (b bigtableWriteBatchV1) Add(tableName, hashValue string, rangeValue []byte, value []byte) { + rows, ok := b.tables[tableName] + if !ok { + rows = map[string]*bigtable.Mutation{} + b.tables[tableName] = rows + } + + // TODO the hashValue should actually be hashed - but I have data written in + // this format, so we need to do a proper migration. + rowKey := hashValue + separator + string(rangeValue) + mutation, ok := rows[rowKey] + if !ok { + mutation = bigtable.NewMutation() + rows[rowKey] = mutation + } + + mutation.Set(columnFamily, column, 0, value) +} + +func (s *storageClientV1) BatchWrite(ctx context.Context, batch chunk.WriteBatch) error { + bigtableBatch := batch.(bigtableWriteBatchV1) + + for tableName, rows := range bigtableBatch.tables { + table := s.client.Open(tableName) + rowKeys := make([]string, 0, len(rows)) + muts := make([]*bigtable.Mutation, 0, len(rows)) + for rowKey, mut := range rows { + rowKeys = append(rowKeys, rowKey) + muts = append(muts, mut) + } + + errs, err := table.ApplyBulk(ctx, rowKeys, muts) + if err != nil { + return err + } + for _, err := range errs { + if err != nil { + return err + } + } + } + + return nil +} + +func (s *storageClientV1) QueryPages(ctx context.Context, query chunk.IndexQuery, callback func(result chunk.ReadBatch, lastPage bool) (shouldContinue bool)) error { + sp, ctx := ot.StartSpanFromContext(ctx, "QueryPages", ot.Tag{Key: "tableName", Value: query.TableName}, ot.Tag{Key: "hashValue", Value: query.HashValue}) + defer sp.Finish() + + table := s.client.Open(query.TableName) + + var rowRange bigtable.RowRange + if len(query.RangeValuePrefix) > 0 { + rowRange = bigtable.PrefixRange(query.HashValue + separator + string(query.RangeValuePrefix)) + } else if len(query.RangeValueStart) > 0 { + rowRange = bigtable.NewRange(query.HashValue+separator+string(query.RangeValueStart), query.HashValue+separator+string('\xff')) + } else { + rowRange = bigtable.PrefixRange(query.HashValue + separator) + } + + err := table.ReadRows(ctx, rowRange, func(r bigtable.Row) bool { + return callback(bigtableReadBatchV1(r), false) + }, bigtable.RowFilter(bigtable.FamilyFilter(columnFamily))) + if err != nil { + sp.LogFields(otlog.String("error", err.Error())) + return errors.WithStack(err) + } + return nil +} + +// bigtableReadBatchV1 represents a batch of rows read from Bigtable. As the +// bigtable interface gives us rows one-by-one, a batch always only contains +// a single row. +type bigtableReadBatchV1 bigtable.Row + +func (bigtableReadBatchV1) Len() int { + return 1 +} + +func (b bigtableReadBatchV1) RangeValue(index int) []byte { + if index != 0 { + panic("index != 0") + } + // String before the first separator is the hashkey + parts := strings.SplitN(bigtable.Row(b).Key(), separator, 2) + return []byte(parts[1]) +} + +func (b bigtableReadBatchV1) Value(index int) []byte { + if index != 0 { + panic("index != 0") + } + cf, ok := b[columnFamily] + if !ok || len(cf) != 1 { + panic("bad response from bigtable") + } + return cf[0].Value +} diff --git a/pkg/chunk/storage/factory.go b/pkg/chunk/storage/factory.go index b48b0d2f502..14d34c234f8 100644 --- a/pkg/chunk/storage/factory.go +++ b/pkg/chunk/storage/factory.go @@ -24,7 +24,7 @@ type Config struct { // RegisterFlags adds the flags required to configure this flag set. func (cfg *Config) RegisterFlags(f *flag.FlagSet) { - flag.StringVar(&cfg.StorageClient, "chunk.storage-client", "aws", "Which storage client to use (aws, gcp, cassandra, inmemory).") + flag.StringVar(&cfg.StorageClient, "chunk.storage-client", "aws", "Which storage client to use (aws, gcp, gcpv1, cassandra, inmemory).") cfg.AWSStorageConfig.RegisterFlags(f) cfg.GCPStorageConfig.RegisterFlags(f) cfg.CassandraStorageConfig.RegisterFlags(f) @@ -41,6 +41,8 @@ func NewStorageClient(cfg Config, schemaCfg chunk.SchemaConfig) (chunk.StorageCl level.Warn(util.Logger).Log("msg", "ignoring DynamoDB URL path", "path", path) } return aws.NewStorageClient(cfg.AWSStorageConfig, schemaCfg) + case "gcpv1": + return gcp.NewStorageClientV1(context.Background(), cfg.GCPStorageConfig, schemaCfg) case "gcp": return gcp.NewStorageClient(context.Background(), cfg.GCPStorageConfig, schemaCfg) case "cassandra": diff --git a/pkg/chunk/storage/storage_client_test.go b/pkg/chunk/storage/storage_client_test.go index 7691ae41aec..83cf34dbd6e 100644 --- a/pkg/chunk/storage/storage_client_test.go +++ b/pkg/chunk/storage/storage_client_test.go @@ -37,6 +37,24 @@ func TestStoreChunks(t *testing.T) { } } +func TestStoreIndex(t *testing.T) { + for _, fixture := range fixtures { + t.Run(fixture.Name(), func(t *testing.T) { + storageClient, tableClient, schemaConfig, err := fixture.Clients() + require.NoError(t, err) + defer fixture.Teardown() + + tableManager, err := chunk.NewTableManager(schemaConfig, 12*time.Hour, tableClient) + require.NoError(t, err) + + err = tableManager.SyncTables(context.Background()) + require.NoError(t, err) + + testStorageClientIndex(t, storageClient, tableClient) + }) + } +} + func testStorageClientChunks(t *testing.T, client chunk.StorageClient) { const batchSize = 50 ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) @@ -88,6 +106,135 @@ func testStorageClientChunks(t *testing.T, client chunk.StorageClient) { } } +func testStorageClientIndex(t *testing.T, client chunk.StorageClient, tableClient chunk.TableClient) { + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + type writeBatch []struct { + tableName, hashValue string + rangeValue []byte + value []byte + } + + cases := []struct { + batch writeBatch + + query []chunk.IndexQuery + }{ + { + batch: writeBatch{ + { + tableName: "t1", + hashValue: "hash1", + rangeValue: []byte("range1"), + value: []byte("1"), + }, + { + tableName: "t1", + hashValue: "hash1", + rangeValue: []byte("range2"), + value: []byte("2"), + }, + { + tableName: "t1", + hashValue: "hash1", + rangeValue: []byte("range3"), + value: []byte("3"), + }, + { + tableName: "t1", + hashValue: "hash1", + rangeValue: []byte("bleepboop"), + value: []byte("4"), + }, + { + tableName: "t1", + hashValue: "hash2", + rangeValue: []byte("bleepboop"), + value: []byte("5"), + }, + }, + + query: []chunk.IndexQuery{ + { + TableName: "t1", + HashValue: "hash1", + RangeValuePrefix: []byte("range"), + }, + { + TableName: "t1", + HashValue: "hash1", + RangeValueStart: []byte("range2"), + }, + }, + }, + } + + for _, c := range cases { + mockSC := chunk.NewMockStorage() + require.NoError(t, mockSC.CreateTable(ctx, chunk.TableDesc{ + Name: c.batch[0].tableName, + })) + require.NoError(t, tableClient.CreateTable(ctx, chunk.TableDesc{ + Name: c.batch[0].tableName, + ProvisionedRead: 100000, + ProvisionedWrite: 100000, + })) + + mockWB := mockSC.NewWriteBatch() + actualWB := client.NewWriteBatch() + + for _, b := range c.batch { + mockWB.Add(b.tableName, b.hashValue, b.rangeValue, b.value) + actualWB.Add(b.tableName, b.hashValue, b.rangeValue, b.value) + } + + require.NoError(t, mockSC.BatchWrite(ctx, mockWB)) + require.NoError(t, client.BatchWrite(ctx, actualWB)) + + type readVal struct { + rangeValue string + value []byte + } + + for _, qry := range c.query { + expVals := []readVal{} + gotVals := []readVal{} + + err := mockSC.QueryPages(ctx, qry, func(result chunk.ReadBatch, lp bool) bool { + for i := 0; i < result.Len(); i++ { + expVals = append(expVals, readVal{ + rangeValue: string(result.RangeValue(i)), + value: result.Value(i), + }) + } + return !lp + }) + require.NoError(t, err) + + err = client.QueryPages(ctx, qry, func(result chunk.ReadBatch, lp bool) bool { + for i := 0; i < result.Len(); i++ { + gotVals = append(gotVals, readVal{ + rangeValue: string(result.RangeValue(i)), + value: result.Value(i), + }) + } + return !lp + }) + require.NoError(t, err) + + sort.Slice(expVals, func(i, j int) bool { + return expVals[i].rangeValue < expVals[j].rangeValue + }) + sort.Slice(gotVals, func(i, j int) bool { + return gotVals[i].rangeValue < gotVals[j].rangeValue + }) + + require.Equal(t, expVals, gotVals) + } + } +} + const userID = "userID" func dummyChunk(now model.Time) chunk.Chunk { diff --git a/pkg/chunk/storage_client.go b/pkg/chunk/storage_client.go index 7ba0eace983..e74143d4002 100644 --- a/pkg/chunk/storage_client.go +++ b/pkg/chunk/storage_client.go @@ -8,7 +8,7 @@ type StorageClient interface { NewWriteBatch() WriteBatch BatchWrite(context.Context, WriteBatch) error - // For the read path. + // This retrieves the query result in batches and passes it to the callback. The return value of the callback would determine if more data should be retrieved. QueryPages(ctx context.Context, query IndexQuery, callback func(result ReadBatch, lastPage bool) (shouldContinue bool)) error // For storing and retrieving chunks. @@ -23,7 +23,9 @@ type WriteBatch interface { // ReadBatch represents the results of a QueryPages. type ReadBatch interface { + // The total number of entries in this batch. Len() int + // The RangeValue key and Value for the index. RangeValue(index int) []byte Value(index int) []byte } From 9730567845ddb6b94304c357c4aaef5fad370cd6 Mon Sep 17 00:00:00 2001 From: Goutham Veeramachaneni Date: Fri, 16 Mar 2018 16:38:15 +0530 Subject: [PATCH 2/3] Hash the hashvalue in BigTable for better dist. Signed-off-by: Goutham Veeramachaneni --- pkg/chunk/gcp/storage_client.go | 17 +++++++++++------ 1 file changed, 11 insertions(+), 6 deletions(-) diff --git a/pkg/chunk/gcp/storage_client.go b/pkg/chunk/gcp/storage_client.go index 922b7c3e777..cfb79c69bcb 100644 --- a/pkg/chunk/gcp/storage_client.go +++ b/pkg/chunk/gcp/storage_client.go @@ -2,6 +2,7 @@ package gcp import ( "context" + "crypto/sha256" "flag" "fmt" "strings" @@ -95,10 +96,10 @@ func (b bigtableWriteBatchV2) Add(tableName, hashValue string, rangeValue []byte b.tables[tableName] = rows } - // TODO the hashValue should actually be hashed - but I have data written in - // this format, so we need to do a proper migration. - // TODO TODO ^ - rowKey := hashValue + hasher := sha256.New() + hasher.Write([]byte(hashValue)) + + rowKey := string(hasher.Sum(nil)) mutation, ok := rows[rowKey] if !ok { mutation = bigtable.NewMutation() @@ -150,7 +151,11 @@ func (s *storageClientV2) QueryPages(ctx context.Context, query chunk.IndexQuery rOpts = append(rOpts, bigtable.RowFilter(bigtable.ColumnRangeFilter(columnFamily, string(query.RangeValueStart), ""))) } - r, err := table.ReadRow(ctx, query.HashValue, rOpts...) + hasher := sha256.New() + hasher.Write([]byte(query.HashValue)) + hashValue := string(hasher.Sum(nil)) + + r, err := table.ReadRow(ctx, hashValue, rOpts...) if err != nil { sp.LogFields(otlog.String("error", err.Error())) return errors.WithStack(err) @@ -165,7 +170,7 @@ func (s *storageClientV2) QueryPages(ctx context.Context, query chunk.IndexQuery val[i].Column = strings.TrimPrefix(val[i].Column, columnFamily+":") // TODO: Hacky hacky ^ } - callback(bigtableReadBatchV2(val), true) // TODO: Check true or false. + callback(bigtableReadBatchV2(val), true) // TODO: Pass nothing to cb. return nil } From bfbfdacb602a2de6b85e49b80416e1d65935f76d Mon Sep 17 00:00:00 2001 From: Goutham Veeramachaneni Date: Sat, 17 Mar 2018 17:49:46 +0530 Subject: [PATCH 3/3] Address feedback Signed-off-by: Goutham Veeramachaneni --- pkg/chunk/gcp/fixtures.go | 23 ++--- pkg/chunk/gcp/storage_client.go | 172 ++++++++++++++------------------ 2 files changed, 83 insertions(+), 112 deletions(-) diff --git a/pkg/chunk/gcp/fixtures.go b/pkg/chunk/gcp/fixtures.go index 3e55ab06a05..04c007c7240 100644 --- a/pkg/chunk/gcp/fixtures.go +++ b/pkg/chunk/gcp/fixtures.go @@ -22,7 +22,7 @@ type fixture struct { srv *bttest.Server name string - version int + columnKeyClient bool } func (f *fixture) Name() string { @@ -65,18 +65,10 @@ func (f *fixture) Clients() ( client: adminClient, } - if f.version == 1 { - sClient = &storageClientV1{ - storageClientV2{ - schemaCfg: schemaConfig, - client: client, - }, - } + if !f.columnKeyClient { + sClient = newStorageClientColumnKey(Config{}, client, schemaConfig) } else { - sClient = &storageClientV2{ - schemaCfg: schemaConfig, - client: client, - } + sClient = newStorageClientV1(Config{}, client, schemaConfig) } return } @@ -89,11 +81,10 @@ func (f *fixture) Teardown() error { // Fixtures for unit testing GCP storage. var Fixtures = []chunk.Fixture{ &fixture{ - name: "GCPv2", - version: 2, + name: "GCP-ColumnKey", + columnKeyClient: true, }, &fixture{ - name: "GCPv1", - version: 1, + name: "GCPv1", }, } diff --git a/pkg/chunk/gcp/storage_client.go b/pkg/chunk/gcp/storage_client.go index cfb79c69bcb..1d84c1eed6e 100644 --- a/pkg/chunk/gcp/storage_client.go +++ b/pkg/chunk/gcp/storage_client.go @@ -2,9 +2,9 @@ package gcp import ( "context" - "crypto/sha256" "flag" "fmt" + "hash/fnv" "strings" "cloud.google.com/go/bigtable" @@ -36,19 +36,20 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { } type storageClientV1 struct { - storageClientV2 + storageClientColumnKey } -// storageClientV2 implements chunk.storageClientV2 for GCP. -type storageClientV2 struct { +// storageClientColumnKey implements chunk.storageClient for GCP. +type storageClientColumnKey struct { cfg Config schemaCfg chunk.SchemaConfig client *bigtable.Client + keysFn keysFn } // NewStorageClient returns a new StorageClient. func NewStorageClient(ctx context.Context, cfg Config, schemaCfg chunk.SchemaConfig) (chunk.StorageClient, error) { - return NewStorageClientV2(ctx, cfg, schemaCfg) + return NewStorageClientColumnKey(ctx, cfg, schemaCfg) } // NewStorageClientV1 returns a new v1 StorageClient. @@ -57,60 +58,83 @@ func NewStorageClientV1(ctx context.Context, cfg Config, schemaCfg chunk.SchemaC if err != nil { return nil, err } + + return newStorageClientV1(cfg, client, schemaCfg), nil +} + +func newStorageClientV1(cfg Config, client *bigtable.Client, schemaCfg chunk.SchemaConfig) *storageClientV1 { return &storageClientV1{ - storageClientV2{ + storageClientColumnKey{ cfg: cfg, schemaCfg: schemaCfg, client: client, + keysFn: func(hashValue string, rangeValue []byte) (string, string) { + // TODO the hashValue should actually be hashed - but I have data written in + // this format, so we need to do a proper migration. + rowKey := hashValue + separator + string(rangeValue) + + return rowKey, column + }, }, - }, nil + } } -// NewStorageClientV2 returns a new v2 StorageClient. -func NewStorageClientV2(ctx context.Context, cfg Config, schemaCfg chunk.SchemaConfig) (chunk.StorageClient, error) { +// NewStorageClientColumnKey returns a new v2 StorageClient. +func NewStorageClientColumnKey(ctx context.Context, cfg Config, schemaCfg chunk.SchemaConfig) (chunk.StorageClient, error) { client, err := bigtable.NewClient(ctx, cfg.project, cfg.instance, instrumentation()...) if err != nil { return nil, err } - return &storageClientV2{ + + return newStorageClientV1(cfg, client, schemaCfg), nil +} + +func newStorageClientColumnKey(cfg Config, client *bigtable.Client, schemaCfg chunk.SchemaConfig) *storageClientColumnKey { + return &storageClientColumnKey{ cfg: cfg, schemaCfg: schemaCfg, client: client, - }, nil + keysFn: func(hashValue string, rangeValue []byte) (string, string) { + // We are hashing the hash value to improve distribution of keys. + return hashKey(hashValue), string(rangeValue) + }, + } } -func (s *storageClientV2) NewWriteBatch() chunk.WriteBatch { - return bigtableWriteBatchV2{ +func (s *storageClientColumnKey) NewWriteBatch() chunk.WriteBatch { + return bigtableWriteBatch{ tables: map[string]map[string]*bigtable.Mutation{}, + keysFn: s.keysFn, } } -type bigtableWriteBatchV2 struct { +// keysFn returns the row and column keys for the given hash and range keys. +type keysFn func(hashValue string, rangeValue []byte) (rowKey, columnKey string) + +type bigtableWriteBatch struct { tables map[string]map[string]*bigtable.Mutation + keysFn keysFn } -func (b bigtableWriteBatchV2) Add(tableName, hashValue string, rangeValue []byte, value []byte) { +func (b bigtableWriteBatch) Add(tableName, hashValue string, rangeValue []byte, value []byte) { rows, ok := b.tables[tableName] if !ok { rows = map[string]*bigtable.Mutation{} b.tables[tableName] = rows } - hasher := sha256.New() - hasher.Write([]byte(hashValue)) - - rowKey := string(hasher.Sum(nil)) + rowKey, columnKey := b.keysFn(hashValue, rangeValue) mutation, ok := rows[rowKey] if !ok { mutation = bigtable.NewMutation() rows[rowKey] = mutation } - mutation.Set(columnFamily, string(rangeValue), 0, value) + mutation.Set(columnFamily, columnKey, 0, value) } -func (s *storageClientV2) BatchWrite(ctx context.Context, batch chunk.WriteBatch) error { - bigtableBatch := batch.(bigtableWriteBatchV2) +func (s *storageClientColumnKey) BatchWrite(ctx context.Context, batch chunk.WriteBatch) error { + bigtableBatch := batch.(bigtableWriteBatch) for tableName, rows := range bigtableBatch.tables { table := s.client.Open(tableName) @@ -135,7 +159,7 @@ func (s *storageClientV2) BatchWrite(ctx context.Context, batch chunk.WriteBatch return nil } -func (s *storageClientV2) QueryPages(ctx context.Context, query chunk.IndexQuery, callback func(result chunk.ReadBatch, lastPage bool) (shouldContinue bool)) error { +func (s *storageClientColumnKey) QueryPages(ctx context.Context, query chunk.IndexQuery, callback func(result chunk.ReadBatch, lastPage bool) (shouldContinue bool)) error { sp, ctx := ot.StartSpanFromContext(ctx, "QueryPages", ot.Tag{Key: "tableName", Value: query.TableName}, ot.Tag{Key: "hashValue", Value: query.HashValue}) defer sp.Finish() @@ -151,9 +175,7 @@ func (s *storageClientV2) QueryPages(ctx context.Context, query chunk.IndexQuery rOpts = append(rOpts, bigtable.RowFilter(bigtable.ColumnRangeFilter(columnFamily, string(query.RangeValueStart), ""))) } - hasher := sha256.New() - hasher.Write([]byte(query.HashValue)) - hashValue := string(hasher.Sum(nil)) + hashValue := hashKey(query.HashValue) r, err := table.ReadRow(ctx, hashValue, rOpts...) if err != nil { @@ -166,30 +188,34 @@ func (s *storageClientV2) QueryPages(ctx context.Context, query chunk.IndexQuery panic("bad response from bigtable, columnFamily missing") } - for i := range val { - val[i].Column = strings.TrimPrefix(val[i].Column, columnFamily+":") - // TODO: Hacky hacky ^ - } - callback(bigtableReadBatchV2(val), true) // TODO: Pass nothing to cb. + callback(bigtableReadBatchColumnKey{ + items: val, + columnPrefix: columnFamily + ":", + }, true) // TODO: Pass nothing to cb. return nil } -// bigtableReadBatchV2 represents a batch of values read from Bigtable. -type bigtableReadBatchV2 []bigtable.ReadItem +// bigtableReadBatchColumnKey represents a batch of values read from Bigtable. +type bigtableReadBatchColumnKey struct { + items []bigtable.ReadItem + columnPrefix string +} -func (b bigtableReadBatchV2) Len() int { - return len(b) +func (b bigtableReadBatchColumnKey) Len() int { + return len(b.items) } -func (b bigtableReadBatchV2) RangeValue(index int) []byte { - return []byte(b[index].Column) +func (b bigtableReadBatchColumnKey) RangeValue(index int) []byte { + return []byte( + strings.TrimPrefix(b.items[index].Column, b.columnPrefix), + ) } -func (b bigtableReadBatchV2) Value(index int) []byte { - return b[index].Value +func (b bigtableReadBatchColumnKey) Value(index int) []byte { + return b.items[index].Value } -func (s *storageClientV2) PutChunks(ctx context.Context, chunks []chunk.Chunk) error { +func (s *storageClientColumnKey) PutChunks(ctx context.Context, chunks []chunk.Chunk) error { keys := map[string][]string{} muts := map[string][]*bigtable.Mutation{} @@ -223,7 +249,7 @@ func (s *storageClientV2) PutChunks(ctx context.Context, chunks []chunk.Chunk) e return nil } -func (s *storageClientV2) GetChunks(ctx context.Context, input []chunk.Chunk) ([]chunk.Chunk, error) { +func (s *storageClientColumnKey) GetChunks(ctx context.Context, input []chunk.Chunk) ([]chunk.Chunk, error) { sp, ctx := ot.StartSpanFromContext(ctx, "GetChunks") defer sp.Finish() sp.LogFields(otlog.Int("chunks requested", len(input))) @@ -303,61 +329,6 @@ func (s *storageClientV2) GetChunks(ctx context.Context, input []chunk.Chunk) ([ return output, nil } -func (s *storageClientV1) NewWriteBatch() chunk.WriteBatch { - return bigtableWriteBatchV1{ - tables: map[string]map[string]*bigtable.Mutation{}, - } -} - -type bigtableWriteBatchV1 struct { - tables map[string]map[string]*bigtable.Mutation -} - -func (b bigtableWriteBatchV1) Add(tableName, hashValue string, rangeValue []byte, value []byte) { - rows, ok := b.tables[tableName] - if !ok { - rows = map[string]*bigtable.Mutation{} - b.tables[tableName] = rows - } - - // TODO the hashValue should actually be hashed - but I have data written in - // this format, so we need to do a proper migration. - rowKey := hashValue + separator + string(rangeValue) - mutation, ok := rows[rowKey] - if !ok { - mutation = bigtable.NewMutation() - rows[rowKey] = mutation - } - - mutation.Set(columnFamily, column, 0, value) -} - -func (s *storageClientV1) BatchWrite(ctx context.Context, batch chunk.WriteBatch) error { - bigtableBatch := batch.(bigtableWriteBatchV1) - - for tableName, rows := range bigtableBatch.tables { - table := s.client.Open(tableName) - rowKeys := make([]string, 0, len(rows)) - muts := make([]*bigtable.Mutation, 0, len(rows)) - for rowKey, mut := range rows { - rowKeys = append(rowKeys, rowKey) - muts = append(muts, mut) - } - - errs, err := table.ApplyBulk(ctx, rowKeys, muts) - if err != nil { - return err - } - for _, err := range errs { - if err != nil { - return err - } - } - } - - return nil -} - func (s *storageClientV1) QueryPages(ctx context.Context, query chunk.IndexQuery, callback func(result chunk.ReadBatch, lastPage bool) (shouldContinue bool)) error { sp, ctx := ot.StartSpanFromContext(ctx, "QueryPages", ot.Tag{Key: "tableName", Value: query.TableName}, ot.Tag{Key: "hashValue", Value: query.HashValue}) defer sp.Finish() @@ -411,3 +382,12 @@ func (b bigtableReadBatchV1) Value(index int) []byte { } return cf[0].Value } + +func hashKey(key string) string { + hasher := fnv.New64a() + hasher.Write([]byte(key)) + + hashedKey := string(hasher.Sum(nil)) + + return hashedKey + key // For maintaining uniqueness. +}