Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 13 additions & 5 deletions pkg/chunk/gcp/fixtures.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ const (
type fixture struct {
srv *bttest.Server
name string

columnKeyClient bool
}

func (f *fixture) Name() string {
Expand Down Expand Up @@ -59,13 +61,15 @@ func (f *fixture) Clients() (
Prefix: "chunks",
},
}
sClient = &storageClient{
schemaCfg: schemaConfig,
client: client,
}
tClient = &tableClient{
client: adminClient,
}

if !f.columnKeyClient {
sClient = newStorageClientColumnKey(Config{}, client, schemaConfig)
} else {
sClient = newStorageClientV1(Config{}, client, schemaConfig)
}
return
}

Expand All @@ -77,6 +81,10 @@ func (f *fixture) Teardown() error {
// Fixtures for unit testing GCP storage.
var Fixtures = []chunk.Fixture{
&fixture{
name: "GCP",
name: "GCP-ColumnKey",
columnKeyClient: true,
},
&fixture{
name: "GCPv1",
},
}
201 changes: 158 additions & 43 deletions pkg/chunk/gcp/storage_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"flag"
"fmt"
"hash/fnv"
"strings"

"cloud.google.com/go/bigtable"
Expand Down Expand Up @@ -34,34 +35,85 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
f.StringVar(&cfg.instance, "bigtable.instance", "", "Bigtable instance ID.")
}

// storageClient implements chunk.storageClient for GCP.
type storageClient struct {
type storageClientV1 struct {
storageClientColumnKey
}

// storageClientColumnKey implements chunk.storageClient for GCP.
type storageClientColumnKey struct {
cfg Config
schemaCfg chunk.SchemaConfig
client *bigtable.Client
keysFn keysFn
}

// NewStorageClient returns a new StorageClient.
func NewStorageClient(ctx context.Context, cfg Config, schemaCfg chunk.SchemaConfig) (chunk.StorageClient, error) {
return NewStorageClientColumnKey(ctx, cfg, schemaCfg)
}

// NewStorageClientV1 returns a new v1 StorageClient.
func NewStorageClientV1(ctx context.Context, cfg Config, schemaCfg chunk.SchemaConfig) (chunk.StorageClient, error) {
client, err := bigtable.NewClient(ctx, cfg.project, cfg.instance, instrumentation()...)
if err != nil {
return nil, err
}

return newStorageClientV1(cfg, client, schemaCfg), nil
}

func newStorageClientV1(cfg Config, client *bigtable.Client, schemaCfg chunk.SchemaConfig) *storageClientV1 {
return &storageClientV1{
storageClientColumnKey{
cfg: cfg,
schemaCfg: schemaCfg,
client: client,
keysFn: func(hashValue string, rangeValue []byte) (string, string) {
// TODO the hashValue should actually be hashed - but I have data written in
// this format, so we need to do a proper migration.
rowKey := hashValue + separator + string(rangeValue)

return rowKey, column
},
},
}
}

// NewStorageClientColumnKey returns a new v2 StorageClient.
func NewStorageClientColumnKey(ctx context.Context, cfg Config, schemaCfg chunk.SchemaConfig) (chunk.StorageClient, error) {
client, err := bigtable.NewClient(ctx, cfg.project, cfg.instance, instrumentation()...)
if err != nil {
return nil, err
}
return &storageClient{

return newStorageClientV1(cfg, client, schemaCfg), nil
}

func newStorageClientColumnKey(cfg Config, client *bigtable.Client, schemaCfg chunk.SchemaConfig) *storageClientColumnKey {
return &storageClientColumnKey{
cfg: cfg,
schemaCfg: schemaCfg,
client: client,
}, nil
keysFn: func(hashValue string, rangeValue []byte) (string, string) {
// We are hashing the hash value to improve distribution of keys.
return hashKey(hashValue), string(rangeValue)
},
}
}

func (s *storageClient) NewWriteBatch() chunk.WriteBatch {
func (s *storageClientColumnKey) NewWriteBatch() chunk.WriteBatch {
return bigtableWriteBatch{
tables: map[string]map[string]*bigtable.Mutation{},
keysFn: s.keysFn,
}
}

// keysFn returns the row and column keys for the given hash and range keys.
type keysFn func(hashValue string, rangeValue []byte) (rowKey, columnKey string)

type bigtableWriteBatch struct {
tables map[string]map[string]*bigtable.Mutation
keysFn keysFn
}

func (b bigtableWriteBatch) Add(tableName, hashValue string, rangeValue []byte, value []byte) {
Expand All @@ -71,19 +123,17 @@ func (b bigtableWriteBatch) Add(tableName, hashValue string, rangeValue []byte,
b.tables[tableName] = rows
}

// TODO the hashValue should actually be hashed - but I have data written in
// this format, so we need to do a proper migration.
rowKey := hashValue + separator + string(rangeValue)
rowKey, columnKey := b.keysFn(hashValue, rangeValue)
mutation, ok := rows[rowKey]
if !ok {
mutation = bigtable.NewMutation()
rows[rowKey] = mutation
}

mutation.Set(columnFamily, column, 0, value)
mutation.Set(columnFamily, columnKey, 0, value)
}

func (s *storageClient) BatchWrite(ctx context.Context, batch chunk.WriteBatch) error {
func (s *storageClientColumnKey) BatchWrite(ctx context.Context, batch chunk.WriteBatch) error {
bigtableBatch := batch.(bigtableWriteBatch)

for tableName, rows := range bigtableBatch.tables {
Expand All @@ -109,61 +159,63 @@ func (s *storageClient) BatchWrite(ctx context.Context, batch chunk.WriteBatch)
return nil
}

func (s *storageClient) QueryPages(ctx context.Context, query chunk.IndexQuery, callback func(result chunk.ReadBatch, lastPage bool) (shouldContinue bool)) error {
func (s *storageClientColumnKey) QueryPages(ctx context.Context, query chunk.IndexQuery, callback func(result chunk.ReadBatch, lastPage bool) (shouldContinue bool)) error {
sp, ctx := ot.StartSpanFromContext(ctx, "QueryPages", ot.Tag{Key: "tableName", Value: query.TableName}, ot.Tag{Key: "hashValue", Value: query.HashValue})
defer sp.Finish()

table := s.client.Open(query.TableName)

var rowRange bigtable.RowRange
rOpts := []bigtable.ReadOption{
bigtable.RowFilter(bigtable.FamilyFilter(columnFamily)),
}

if len(query.RangeValuePrefix) > 0 {
rowRange = bigtable.PrefixRange(query.HashValue + separator + string(query.RangeValuePrefix))
rOpts = append(rOpts, bigtable.RowFilter(bigtable.ColumnFilter(string(query.RangeValuePrefix)+".*"))) // TODO: Check again and anchor.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm yeah this is not particularly desirable. @mbrukman do you know of a better way?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@garye, what are your thoughts on this?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a bit confused, this is doing a ColumnFilter with a value prefix?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep. We want to filter by prefix.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was just confused because this is setting up a filter on column names, but the RangeValuePrefix implies that the filter should be used for values instead.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is just a Cortex-ism; our datamodel at this level is super simple, consisting of three-tuples of (hash value, range value, 'cell' value). We always specify the hash value (mapped to the row key in bigtable), want to do range queries over the range value, and equality filtering on the 'cell' value.

Sorry for the confusion.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense, thanks for the explanation!

I'm not sure there's a significantly better way to do this... It's probably worth doing this filtering on the bigtable side but, if the amount of data that would get filtered out by this ColumnFilter is small, consider measuring against doing it all client-side.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder, would it be valid to apply the same trick @bcotton added to the row ranges; use range query from prefix to prefix+null ?

} else if len(query.RangeValueStart) > 0 {
rowRange = bigtable.NewRange(query.HashValue+separator+string(query.RangeValueStart), query.HashValue+separator+string('\xff'))
} else {
rowRange = bigtable.PrefixRange(query.HashValue + separator)
rOpts = append(rOpts, bigtable.RowFilter(bigtable.ColumnRangeFilter(columnFamily, string(query.RangeValueStart), "")))
}

err := table.ReadRows(ctx, rowRange, func(r bigtable.Row) bool {
return callback(bigtableReadBatch(r), false)
}, bigtable.RowFilter(bigtable.FamilyFilter(columnFamily)))
hashValue := hashKey(query.HashValue)

r, err := table.ReadRow(ctx, hashValue, rOpts...)
if err != nil {
sp.LogFields(otlog.String("error", err.Error()))
return errors.WithStack(err)
}

val, ok := r[columnFamily]
if !ok {
panic("bad response from bigtable, columnFamily missing")
}

callback(bigtableReadBatchColumnKey{
items: val,
columnPrefix: columnFamily + ":",
}, true) // TODO: Pass nothing to cb.
return nil
}

// bigtableReadBatch represents a batch of rows read from Bigtable. As the
// bigtable interface gives us rows one-by-one, a batch always only contains
// a single row.
type bigtableReadBatch bigtable.Row
// bigtableReadBatchColumnKey represents a batch of values read from Bigtable.
type bigtableReadBatchColumnKey struct {
items []bigtable.ReadItem
columnPrefix string
}

func (bigtableReadBatch) Len() int {
return 1
func (b bigtableReadBatchColumnKey) Len() int {
return len(b.items)
}

func (b bigtableReadBatch) RangeValue(index int) []byte {
if index != 0 {
panic("index != 0")
}
// String before the first separator is the hashkey
parts := strings.SplitN(bigtable.Row(b).Key(), separator, 2)
return []byte(parts[1])
func (b bigtableReadBatchColumnKey) RangeValue(index int) []byte {
return []byte(
strings.TrimPrefix(b.items[index].Column, b.columnPrefix),
)
}

func (b bigtableReadBatch) Value(index int) []byte {
if index != 0 {
panic("index != 0")
}
cf, ok := b[columnFamily]
if !ok || len(cf) != 1 {
panic("bad response from bigtable")
}
return cf[0].Value
func (b bigtableReadBatchColumnKey) Value(index int) []byte {
return b.items[index].Value
}

func (s *storageClient) PutChunks(ctx context.Context, chunks []chunk.Chunk) error {
func (s *storageClientColumnKey) PutChunks(ctx context.Context, chunks []chunk.Chunk) error {
keys := map[string][]string{}
muts := map[string][]*bigtable.Mutation{}

Expand Down Expand Up @@ -197,7 +249,7 @@ func (s *storageClient) PutChunks(ctx context.Context, chunks []chunk.Chunk) err
return nil
}

func (s *storageClient) GetChunks(ctx context.Context, input []chunk.Chunk) ([]chunk.Chunk, error) {
func (s *storageClientColumnKey) GetChunks(ctx context.Context, input []chunk.Chunk) ([]chunk.Chunk, error) {
sp, ctx := ot.StartSpanFromContext(ctx, "GetChunks")
defer sp.Finish()
sp.LogFields(otlog.Int("chunks requested", len(input)))
Expand Down Expand Up @@ -276,3 +328,66 @@ func (s *storageClient) GetChunks(ctx context.Context, input []chunk.Chunk) ([]c

return output, nil
}

func (s *storageClientV1) QueryPages(ctx context.Context, query chunk.IndexQuery, callback func(result chunk.ReadBatch, lastPage bool) (shouldContinue bool)) error {
sp, ctx := ot.StartSpanFromContext(ctx, "QueryPages", ot.Tag{Key: "tableName", Value: query.TableName}, ot.Tag{Key: "hashValue", Value: query.HashValue})
defer sp.Finish()

table := s.client.Open(query.TableName)

var rowRange bigtable.RowRange
if len(query.RangeValuePrefix) > 0 {
rowRange = bigtable.PrefixRange(query.HashValue + separator + string(query.RangeValuePrefix))
} else if len(query.RangeValueStart) > 0 {
rowRange = bigtable.NewRange(query.HashValue+separator+string(query.RangeValueStart), query.HashValue+separator+string('\xff'))
} else {
rowRange = bigtable.PrefixRange(query.HashValue + separator)
}

err := table.ReadRows(ctx, rowRange, func(r bigtable.Row) bool {
return callback(bigtableReadBatchV1(r), false)
}, bigtable.RowFilter(bigtable.FamilyFilter(columnFamily)))
if err != nil {
sp.LogFields(otlog.String("error", err.Error()))
return errors.WithStack(err)
}
return nil
}

// bigtableReadBatchV1 represents a batch of rows read from Bigtable. As the
// bigtable interface gives us rows one-by-one, a batch always only contains
// a single row.
type bigtableReadBatchV1 bigtable.Row

func (bigtableReadBatchV1) Len() int {
return 1
}

func (b bigtableReadBatchV1) RangeValue(index int) []byte {
if index != 0 {
panic("index != 0")
}
// String before the first separator is the hashkey
parts := strings.SplitN(bigtable.Row(b).Key(), separator, 2)
return []byte(parts[1])
}

func (b bigtableReadBatchV1) Value(index int) []byte {
if index != 0 {
panic("index != 0")
}
cf, ok := b[columnFamily]
if !ok || len(cf) != 1 {
panic("bad response from bigtable")
}
return cf[0].Value
}

func hashKey(key string) string {
hasher := fnv.New64a()
hasher.Write([]byte(key))

hashedKey := string(hasher.Sum(nil))

return hashedKey + key // For maintaining uniqueness.
}
4 changes: 3 additions & 1 deletion pkg/chunk/storage/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ type Config struct {

// RegisterFlags adds the flags required to configure this flag set.
func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
flag.StringVar(&cfg.StorageClient, "chunk.storage-client", "aws", "Which storage client to use (aws, gcp, cassandra, inmemory).")
flag.StringVar(&cfg.StorageClient, "chunk.storage-client", "aws", "Which storage client to use (aws, gcp, gcpv1, cassandra, inmemory).")
cfg.AWSStorageConfig.RegisterFlags(f)
cfg.GCPStorageConfig.RegisterFlags(f)
cfg.CassandraStorageConfig.RegisterFlags(f)
Expand All @@ -41,6 +41,8 @@ func NewStorageClient(cfg Config, schemaCfg chunk.SchemaConfig) (chunk.StorageCl
level.Warn(util.Logger).Log("msg", "ignoring DynamoDB URL path", "path", path)
}
return aws.NewStorageClient(cfg.AWSStorageConfig, schemaCfg)
case "gcpv1":
return gcp.NewStorageClientV1(context.Background(), cfg.GCPStorageConfig, schemaCfg)
case "gcp":
return gcp.NewStorageClient(context.Background(), cfg.GCPStorageConfig, schemaCfg)
case "cassandra":
Expand Down
Loading