diff --git a/pkg/chunk/gcp/fixtures.go b/pkg/chunk/gcp/fixtures.go index 00f8ead45e8..04c007c7240 100644 --- a/pkg/chunk/gcp/fixtures.go +++ b/pkg/chunk/gcp/fixtures.go @@ -21,6 +21,8 @@ const ( type fixture struct { srv *bttest.Server name string + + columnKeyClient bool } func (f *fixture) Name() string { @@ -59,13 +61,15 @@ func (f *fixture) Clients() ( Prefix: "chunks", }, } - sClient = &storageClient{ - schemaCfg: schemaConfig, - client: client, - } tClient = &tableClient{ client: adminClient, } + + if !f.columnKeyClient { + sClient = newStorageClientColumnKey(Config{}, client, schemaConfig) + } else { + sClient = newStorageClientV1(Config{}, client, schemaConfig) + } return } @@ -77,6 +81,10 @@ func (f *fixture) Teardown() error { // Fixtures for unit testing GCP storage. var Fixtures = []chunk.Fixture{ &fixture{ - name: "GCP", + name: "GCP-ColumnKey", + columnKeyClient: true, + }, + &fixture{ + name: "GCPv1", }, } diff --git a/pkg/chunk/gcp/storage_client.go b/pkg/chunk/gcp/storage_client.go index d3666fc474a..1d84c1eed6e 100644 --- a/pkg/chunk/gcp/storage_client.go +++ b/pkg/chunk/gcp/storage_client.go @@ -4,6 +4,7 @@ import ( "context" "flag" "fmt" + "hash/fnv" "strings" "cloud.google.com/go/bigtable" @@ -34,34 +35,85 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { f.StringVar(&cfg.instance, "bigtable.instance", "", "Bigtable instance ID.") } -// storageClient implements chunk.storageClient for GCP. -type storageClient struct { +type storageClientV1 struct { + storageClientColumnKey +} + +// storageClientColumnKey implements chunk.storageClient for GCP. +type storageClientColumnKey struct { cfg Config schemaCfg chunk.SchemaConfig client *bigtable.Client + keysFn keysFn } // NewStorageClient returns a new StorageClient. func NewStorageClient(ctx context.Context, cfg Config, schemaCfg chunk.SchemaConfig) (chunk.StorageClient, error) { + return NewStorageClientColumnKey(ctx, cfg, schemaCfg) +} + +// NewStorageClientV1 returns a new v1 StorageClient. +func NewStorageClientV1(ctx context.Context, cfg Config, schemaCfg chunk.SchemaConfig) (chunk.StorageClient, error) { + client, err := bigtable.NewClient(ctx, cfg.project, cfg.instance, instrumentation()...) + if err != nil { + return nil, err + } + + return newStorageClientV1(cfg, client, schemaCfg), nil +} + +func newStorageClientV1(cfg Config, client *bigtable.Client, schemaCfg chunk.SchemaConfig) *storageClientV1 { + return &storageClientV1{ + storageClientColumnKey{ + cfg: cfg, + schemaCfg: schemaCfg, + client: client, + keysFn: func(hashValue string, rangeValue []byte) (string, string) { + // TODO the hashValue should actually be hashed - but I have data written in + // this format, so we need to do a proper migration. + rowKey := hashValue + separator + string(rangeValue) + + return rowKey, column + }, + }, + } +} + +// NewStorageClientColumnKey returns a new v2 StorageClient. +func NewStorageClientColumnKey(ctx context.Context, cfg Config, schemaCfg chunk.SchemaConfig) (chunk.StorageClient, error) { client, err := bigtable.NewClient(ctx, cfg.project, cfg.instance, instrumentation()...) if err != nil { return nil, err } - return &storageClient{ + + return newStorageClientV1(cfg, client, schemaCfg), nil +} + +func newStorageClientColumnKey(cfg Config, client *bigtable.Client, schemaCfg chunk.SchemaConfig) *storageClientColumnKey { + return &storageClientColumnKey{ cfg: cfg, schemaCfg: schemaCfg, client: client, - }, nil + keysFn: func(hashValue string, rangeValue []byte) (string, string) { + // We are hashing the hash value to improve distribution of keys. + return hashKey(hashValue), string(rangeValue) + }, + } } -func (s *storageClient) NewWriteBatch() chunk.WriteBatch { +func (s *storageClientColumnKey) NewWriteBatch() chunk.WriteBatch { return bigtableWriteBatch{ tables: map[string]map[string]*bigtable.Mutation{}, + keysFn: s.keysFn, } } +// keysFn returns the row and column keys for the given hash and range keys. +type keysFn func(hashValue string, rangeValue []byte) (rowKey, columnKey string) + type bigtableWriteBatch struct { tables map[string]map[string]*bigtable.Mutation + keysFn keysFn } func (b bigtableWriteBatch) Add(tableName, hashValue string, rangeValue []byte, value []byte) { @@ -71,19 +123,17 @@ func (b bigtableWriteBatch) Add(tableName, hashValue string, rangeValue []byte, b.tables[tableName] = rows } - // TODO the hashValue should actually be hashed - but I have data written in - // this format, so we need to do a proper migration. - rowKey := hashValue + separator + string(rangeValue) + rowKey, columnKey := b.keysFn(hashValue, rangeValue) mutation, ok := rows[rowKey] if !ok { mutation = bigtable.NewMutation() rows[rowKey] = mutation } - mutation.Set(columnFamily, column, 0, value) + mutation.Set(columnFamily, columnKey, 0, value) } -func (s *storageClient) BatchWrite(ctx context.Context, batch chunk.WriteBatch) error { +func (s *storageClientColumnKey) BatchWrite(ctx context.Context, batch chunk.WriteBatch) error { bigtableBatch := batch.(bigtableWriteBatch) for tableName, rows := range bigtableBatch.tables { @@ -109,61 +159,63 @@ func (s *storageClient) BatchWrite(ctx context.Context, batch chunk.WriteBatch) return nil } -func (s *storageClient) QueryPages(ctx context.Context, query chunk.IndexQuery, callback func(result chunk.ReadBatch, lastPage bool) (shouldContinue bool)) error { +func (s *storageClientColumnKey) QueryPages(ctx context.Context, query chunk.IndexQuery, callback func(result chunk.ReadBatch, lastPage bool) (shouldContinue bool)) error { sp, ctx := ot.StartSpanFromContext(ctx, "QueryPages", ot.Tag{Key: "tableName", Value: query.TableName}, ot.Tag{Key: "hashValue", Value: query.HashValue}) defer sp.Finish() table := s.client.Open(query.TableName) - var rowRange bigtable.RowRange + rOpts := []bigtable.ReadOption{ + bigtable.RowFilter(bigtable.FamilyFilter(columnFamily)), + } + if len(query.RangeValuePrefix) > 0 { - rowRange = bigtable.PrefixRange(query.HashValue + separator + string(query.RangeValuePrefix)) + rOpts = append(rOpts, bigtable.RowFilter(bigtable.ColumnFilter(string(query.RangeValuePrefix)+".*"))) // TODO: Check again and anchor. } else if len(query.RangeValueStart) > 0 { - rowRange = bigtable.NewRange(query.HashValue+separator+string(query.RangeValueStart), query.HashValue+separator+string('\xff')) - } else { - rowRange = bigtable.PrefixRange(query.HashValue + separator) + rOpts = append(rOpts, bigtable.RowFilter(bigtable.ColumnRangeFilter(columnFamily, string(query.RangeValueStart), ""))) } - err := table.ReadRows(ctx, rowRange, func(r bigtable.Row) bool { - return callback(bigtableReadBatch(r), false) - }, bigtable.RowFilter(bigtable.FamilyFilter(columnFamily))) + hashValue := hashKey(query.HashValue) + + r, err := table.ReadRow(ctx, hashValue, rOpts...) if err != nil { sp.LogFields(otlog.String("error", err.Error())) return errors.WithStack(err) } + + val, ok := r[columnFamily] + if !ok { + panic("bad response from bigtable, columnFamily missing") + } + + callback(bigtableReadBatchColumnKey{ + items: val, + columnPrefix: columnFamily + ":", + }, true) // TODO: Pass nothing to cb. return nil } -// bigtableReadBatch represents a batch of rows read from Bigtable. As the -// bigtable interface gives us rows one-by-one, a batch always only contains -// a single row. -type bigtableReadBatch bigtable.Row +// bigtableReadBatchColumnKey represents a batch of values read from Bigtable. +type bigtableReadBatchColumnKey struct { + items []bigtable.ReadItem + columnPrefix string +} -func (bigtableReadBatch) Len() int { - return 1 +func (b bigtableReadBatchColumnKey) Len() int { + return len(b.items) } -func (b bigtableReadBatch) RangeValue(index int) []byte { - if index != 0 { - panic("index != 0") - } - // String before the first separator is the hashkey - parts := strings.SplitN(bigtable.Row(b).Key(), separator, 2) - return []byte(parts[1]) +func (b bigtableReadBatchColumnKey) RangeValue(index int) []byte { + return []byte( + strings.TrimPrefix(b.items[index].Column, b.columnPrefix), + ) } -func (b bigtableReadBatch) Value(index int) []byte { - if index != 0 { - panic("index != 0") - } - cf, ok := b[columnFamily] - if !ok || len(cf) != 1 { - panic("bad response from bigtable") - } - return cf[0].Value +func (b bigtableReadBatchColumnKey) Value(index int) []byte { + return b.items[index].Value } -func (s *storageClient) PutChunks(ctx context.Context, chunks []chunk.Chunk) error { +func (s *storageClientColumnKey) PutChunks(ctx context.Context, chunks []chunk.Chunk) error { keys := map[string][]string{} muts := map[string][]*bigtable.Mutation{} @@ -197,7 +249,7 @@ func (s *storageClient) PutChunks(ctx context.Context, chunks []chunk.Chunk) err return nil } -func (s *storageClient) GetChunks(ctx context.Context, input []chunk.Chunk) ([]chunk.Chunk, error) { +func (s *storageClientColumnKey) GetChunks(ctx context.Context, input []chunk.Chunk) ([]chunk.Chunk, error) { sp, ctx := ot.StartSpanFromContext(ctx, "GetChunks") defer sp.Finish() sp.LogFields(otlog.Int("chunks requested", len(input))) @@ -276,3 +328,66 @@ func (s *storageClient) GetChunks(ctx context.Context, input []chunk.Chunk) ([]c return output, nil } + +func (s *storageClientV1) QueryPages(ctx context.Context, query chunk.IndexQuery, callback func(result chunk.ReadBatch, lastPage bool) (shouldContinue bool)) error { + sp, ctx := ot.StartSpanFromContext(ctx, "QueryPages", ot.Tag{Key: "tableName", Value: query.TableName}, ot.Tag{Key: "hashValue", Value: query.HashValue}) + defer sp.Finish() + + table := s.client.Open(query.TableName) + + var rowRange bigtable.RowRange + if len(query.RangeValuePrefix) > 0 { + rowRange = bigtable.PrefixRange(query.HashValue + separator + string(query.RangeValuePrefix)) + } else if len(query.RangeValueStart) > 0 { + rowRange = bigtable.NewRange(query.HashValue+separator+string(query.RangeValueStart), query.HashValue+separator+string('\xff')) + } else { + rowRange = bigtable.PrefixRange(query.HashValue + separator) + } + + err := table.ReadRows(ctx, rowRange, func(r bigtable.Row) bool { + return callback(bigtableReadBatchV1(r), false) + }, bigtable.RowFilter(bigtable.FamilyFilter(columnFamily))) + if err != nil { + sp.LogFields(otlog.String("error", err.Error())) + return errors.WithStack(err) + } + return nil +} + +// bigtableReadBatchV1 represents a batch of rows read from Bigtable. As the +// bigtable interface gives us rows one-by-one, a batch always only contains +// a single row. +type bigtableReadBatchV1 bigtable.Row + +func (bigtableReadBatchV1) Len() int { + return 1 +} + +func (b bigtableReadBatchV1) RangeValue(index int) []byte { + if index != 0 { + panic("index != 0") + } + // String before the first separator is the hashkey + parts := strings.SplitN(bigtable.Row(b).Key(), separator, 2) + return []byte(parts[1]) +} + +func (b bigtableReadBatchV1) Value(index int) []byte { + if index != 0 { + panic("index != 0") + } + cf, ok := b[columnFamily] + if !ok || len(cf) != 1 { + panic("bad response from bigtable") + } + return cf[0].Value +} + +func hashKey(key string) string { + hasher := fnv.New64a() + hasher.Write([]byte(key)) + + hashedKey := string(hasher.Sum(nil)) + + return hashedKey + key // For maintaining uniqueness. +} diff --git a/pkg/chunk/storage/factory.go b/pkg/chunk/storage/factory.go index b48b0d2f502..14d34c234f8 100644 --- a/pkg/chunk/storage/factory.go +++ b/pkg/chunk/storage/factory.go @@ -24,7 +24,7 @@ type Config struct { // RegisterFlags adds the flags required to configure this flag set. func (cfg *Config) RegisterFlags(f *flag.FlagSet) { - flag.StringVar(&cfg.StorageClient, "chunk.storage-client", "aws", "Which storage client to use (aws, gcp, cassandra, inmemory).") + flag.StringVar(&cfg.StorageClient, "chunk.storage-client", "aws", "Which storage client to use (aws, gcp, gcpv1, cassandra, inmemory).") cfg.AWSStorageConfig.RegisterFlags(f) cfg.GCPStorageConfig.RegisterFlags(f) cfg.CassandraStorageConfig.RegisterFlags(f) @@ -41,6 +41,8 @@ func NewStorageClient(cfg Config, schemaCfg chunk.SchemaConfig) (chunk.StorageCl level.Warn(util.Logger).Log("msg", "ignoring DynamoDB URL path", "path", path) } return aws.NewStorageClient(cfg.AWSStorageConfig, schemaCfg) + case "gcpv1": + return gcp.NewStorageClientV1(context.Background(), cfg.GCPStorageConfig, schemaCfg) case "gcp": return gcp.NewStorageClient(context.Background(), cfg.GCPStorageConfig, schemaCfg) case "cassandra": diff --git a/pkg/chunk/storage/storage_client_test.go b/pkg/chunk/storage/storage_client_test.go index 7691ae41aec..83cf34dbd6e 100644 --- a/pkg/chunk/storage/storage_client_test.go +++ b/pkg/chunk/storage/storage_client_test.go @@ -37,6 +37,24 @@ func TestStoreChunks(t *testing.T) { } } +func TestStoreIndex(t *testing.T) { + for _, fixture := range fixtures { + t.Run(fixture.Name(), func(t *testing.T) { + storageClient, tableClient, schemaConfig, err := fixture.Clients() + require.NoError(t, err) + defer fixture.Teardown() + + tableManager, err := chunk.NewTableManager(schemaConfig, 12*time.Hour, tableClient) + require.NoError(t, err) + + err = tableManager.SyncTables(context.Background()) + require.NoError(t, err) + + testStorageClientIndex(t, storageClient, tableClient) + }) + } +} + func testStorageClientChunks(t *testing.T, client chunk.StorageClient) { const batchSize = 50 ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) @@ -88,6 +106,135 @@ func testStorageClientChunks(t *testing.T, client chunk.StorageClient) { } } +func testStorageClientIndex(t *testing.T, client chunk.StorageClient, tableClient chunk.TableClient) { + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + type writeBatch []struct { + tableName, hashValue string + rangeValue []byte + value []byte + } + + cases := []struct { + batch writeBatch + + query []chunk.IndexQuery + }{ + { + batch: writeBatch{ + { + tableName: "t1", + hashValue: "hash1", + rangeValue: []byte("range1"), + value: []byte("1"), + }, + { + tableName: "t1", + hashValue: "hash1", + rangeValue: []byte("range2"), + value: []byte("2"), + }, + { + tableName: "t1", + hashValue: "hash1", + rangeValue: []byte("range3"), + value: []byte("3"), + }, + { + tableName: "t1", + hashValue: "hash1", + rangeValue: []byte("bleepboop"), + value: []byte("4"), + }, + { + tableName: "t1", + hashValue: "hash2", + rangeValue: []byte("bleepboop"), + value: []byte("5"), + }, + }, + + query: []chunk.IndexQuery{ + { + TableName: "t1", + HashValue: "hash1", + RangeValuePrefix: []byte("range"), + }, + { + TableName: "t1", + HashValue: "hash1", + RangeValueStart: []byte("range2"), + }, + }, + }, + } + + for _, c := range cases { + mockSC := chunk.NewMockStorage() + require.NoError(t, mockSC.CreateTable(ctx, chunk.TableDesc{ + Name: c.batch[0].tableName, + })) + require.NoError(t, tableClient.CreateTable(ctx, chunk.TableDesc{ + Name: c.batch[0].tableName, + ProvisionedRead: 100000, + ProvisionedWrite: 100000, + })) + + mockWB := mockSC.NewWriteBatch() + actualWB := client.NewWriteBatch() + + for _, b := range c.batch { + mockWB.Add(b.tableName, b.hashValue, b.rangeValue, b.value) + actualWB.Add(b.tableName, b.hashValue, b.rangeValue, b.value) + } + + require.NoError(t, mockSC.BatchWrite(ctx, mockWB)) + require.NoError(t, client.BatchWrite(ctx, actualWB)) + + type readVal struct { + rangeValue string + value []byte + } + + for _, qry := range c.query { + expVals := []readVal{} + gotVals := []readVal{} + + err := mockSC.QueryPages(ctx, qry, func(result chunk.ReadBatch, lp bool) bool { + for i := 0; i < result.Len(); i++ { + expVals = append(expVals, readVal{ + rangeValue: string(result.RangeValue(i)), + value: result.Value(i), + }) + } + return !lp + }) + require.NoError(t, err) + + err = client.QueryPages(ctx, qry, func(result chunk.ReadBatch, lp bool) bool { + for i := 0; i < result.Len(); i++ { + gotVals = append(gotVals, readVal{ + rangeValue: string(result.RangeValue(i)), + value: result.Value(i), + }) + } + return !lp + }) + require.NoError(t, err) + + sort.Slice(expVals, func(i, j int) bool { + return expVals[i].rangeValue < expVals[j].rangeValue + }) + sort.Slice(gotVals, func(i, j int) bool { + return gotVals[i].rangeValue < gotVals[j].rangeValue + }) + + require.Equal(t, expVals, gotVals) + } + } +} + const userID = "userID" func dummyChunk(now model.Time) chunk.Chunk { diff --git a/pkg/chunk/storage_client.go b/pkg/chunk/storage_client.go index 7ba0eace983..e74143d4002 100644 --- a/pkg/chunk/storage_client.go +++ b/pkg/chunk/storage_client.go @@ -8,7 +8,7 @@ type StorageClient interface { NewWriteBatch() WriteBatch BatchWrite(context.Context, WriteBatch) error - // For the read path. + // This retrieves the query result in batches and passes it to the callback. The return value of the callback would determine if more data should be retrieved. QueryPages(ctx context.Context, query IndexQuery, callback func(result ReadBatch, lastPage bool) (shouldContinue bool)) error // For storing and retrieving chunks. @@ -23,7 +23,9 @@ type WriteBatch interface { // ReadBatch represents the results of a QueryPages. type ReadBatch interface { + // The total number of entries in this batch. Len() int + // The RangeValue key and Value for the index. RangeValue(index int) []byte Value(index int) []byte }