diff --git a/cmd/ingester/main.go b/cmd/ingester/main.go index af8df0997ac..dd4419a0273 100644 --- a/cmd/ingester/main.go +++ b/cmd/ingester/main.go @@ -63,13 +63,13 @@ func main() { } defer server.Shutdown() - storageClient, err := storage.NewStorageClient(storageConfig, schemaConfig) + storageOpts, err := storage.Opts(storageConfig, schemaConfig) if err != nil { level.Error(util.Logger).Log("msg", "error initializing storage client", "err", err) os.Exit(1) } - chunkStore, err := chunk.NewStore(chunkStoreConfig, schemaConfig, storageClient) + chunkStore, err := chunk.NewStore(chunkStoreConfig, schemaConfig, storageOpts) if err != nil { level.Error(util.Logger).Log("err", err) os.Exit(1) diff --git a/cmd/lite/main.go b/cmd/lite/main.go index 4835422d3a3..ac59e9aecfb 100644 --- a/cmd/lite/main.go +++ b/cmd/lite/main.go @@ -71,13 +71,13 @@ func main() { } defer server.Shutdown() - storageClient, err := storage.NewStorageClient(storageConfig, schemaConfig) + storageOpts, err := storage.Opts(storageConfig, schemaConfig) if err != nil { level.Error(util.Logger).Log("msg", "error initializing storage client", "err", err) os.Exit(1) } - chunkStore, err := chunk.NewStore(chunkStoreConfig, schemaConfig, storageClient) + chunkStore, err := chunk.NewStore(chunkStoreConfig, schemaConfig, storageOpts) if err != nil { level.Error(util.Logger).Log("err", err) os.Exit(1) diff --git a/cmd/querier/main.go b/cmd/querier/main.go index 421034427d8..93072e4afb1 100644 --- a/cmd/querier/main.go +++ b/cmd/querier/main.go @@ -76,13 +76,13 @@ func main() { defer server.Shutdown() server.HTTP.Handle("/ring", r) - storageClient, err := storage.NewStorageClient(storageConfig, schemaConfig) + storageOpts, err := storage.Opts(storageConfig, schemaConfig) if err != nil { level.Error(util.Logger).Log("msg", "error initializing storage client", "err", err) os.Exit(1) } - chunkStore, err := chunk.NewStore(chunkStoreConfig, schemaConfig, storageClient) + chunkStore, err := chunk.NewStore(chunkStoreConfig, schemaConfig, storageOpts) if err != nil { level.Error(util.Logger).Log("err", err) os.Exit(1) diff --git a/cmd/ruler/main.go b/cmd/ruler/main.go index 0413b109523..2b390cb8ee0 100644 --- a/cmd/ruler/main.go +++ b/cmd/ruler/main.go @@ -50,13 +50,12 @@ func main() { util.InitLogger(&serverConfig) - storageClient, err := storage.NewStorageClient(storageConfig, schemaConfig) + storageOpts, err := storage.Opts(storageConfig, schemaConfig) if err != nil { level.Error(util.Logger).Log("msg", "error initializing storage client", "err", err) os.Exit(1) } - - chunkStore, err := chunk.NewStore(chunkStoreConfig, schemaConfig, storageClient) + chunkStore, err := chunk.NewStore(chunkStoreConfig, schemaConfig, storageOpts) if err != nil { level.Error(util.Logger).Log("err", err) os.Exit(1) diff --git a/pkg/chunk/composite_store.go b/pkg/chunk/composite_store.go index 5bad4e02fbb..ebbd2f38c0e 100644 --- a/pkg/chunk/composite_store.go +++ b/pkg/chunk/composite_store.go @@ -2,7 +2,6 @@ package chunk import ( "context" - "fmt" "sort" "github.com/prometheus/common/model" @@ -34,84 +33,160 @@ func (a byStart) Len() int { return len(a) } func (a byStart) Swap(i, j int) { a[i], a[j] = a[j], a[i] } func (a byStart) Less(i, j int) bool { return a[i].start < a[j].start } -// NewStore creates a new Store which delegates to different stores depending -// on time. -func NewStore(cfg StoreConfig, schemaCfg SchemaConfig, storage StorageClient) (Store, error) { - store, err := newStore(cfg, v1Schema(schemaCfg), storage) - if err != nil { - return nil, err - } +// SchemaOpt stores when a schema starts. +type SchemaOpt struct { + From model.Time + NewStore func(StorageClient) (Store, error) +} - stores := []compositeStoreEntry{ - {0, store}, - } +// SchemaOpts returns the schemas and the times when they activate. +func SchemaOpts(cfg StoreConfig, schemaCfg SchemaConfig) []SchemaOpt { + opts := []SchemaOpt{{ + From: 0, + NewStore: func(storage StorageClient) (Store, error) { + return newStore(cfg, v1Schema(schemaCfg), storage) + }, + }} if schemaCfg.DailyBucketsFrom.IsSet() { - store, err := newStore(cfg, v2Schema(schemaCfg), storage) - if err != nil { - return nil, err - } - stores = append(stores, compositeStoreEntry{schemaCfg.DailyBucketsFrom.Time, store}) + opts = append(opts, SchemaOpt{ + From: schemaCfg.DailyBucketsFrom.Time, + NewStore: func(storage StorageClient) (Store, error) { + return newStore(cfg, v2Schema(schemaCfg), storage) + }, + }) } if schemaCfg.Base64ValuesFrom.IsSet() { - store, err := newStore(cfg, v3Schema(schemaCfg), storage) - if err != nil { - return nil, err - } - stores = append(stores, compositeStoreEntry{schemaCfg.Base64ValuesFrom.Time, store}) + opts = append(opts, SchemaOpt{ + From: schemaCfg.Base64ValuesFrom.Time, + NewStore: func(storage StorageClient) (Store, error) { + return newStore(cfg, v3Schema(schemaCfg), storage) + }, + }) } if schemaCfg.V4SchemaFrom.IsSet() { - store, err := newStore(cfg, v4Schema(schemaCfg), storage) - if err != nil { - return nil, err - } - stores = append(stores, compositeStoreEntry{schemaCfg.V4SchemaFrom.Time, store}) + opts = append(opts, SchemaOpt{ + From: schemaCfg.V4SchemaFrom.Time, + NewStore: func(storage StorageClient) (Store, error) { + return newStore(cfg, v4Schema(schemaCfg), storage) + }, + }) } if schemaCfg.V5SchemaFrom.IsSet() { - store, err := newStore(cfg, v5Schema(schemaCfg), storage) - if err != nil { - return nil, err - } - stores = append(stores, compositeStoreEntry{schemaCfg.V5SchemaFrom.Time, store}) + opts = append(opts, SchemaOpt{ + From: schemaCfg.V5SchemaFrom.Time, + NewStore: func(storage StorageClient) (Store, error) { + return newStore(cfg, v5Schema(schemaCfg), storage) + }, + }) } if schemaCfg.V6SchemaFrom.IsSet() { - store, err := newStore(cfg, v6Schema(schemaCfg), storage) - if err != nil { - return nil, err - } - stores = append(stores, compositeStoreEntry{schemaCfg.V6SchemaFrom.Time, store}) + opts = append(opts, SchemaOpt{ + From: schemaCfg.V6SchemaFrom.Time, + NewStore: func(storage StorageClient) (Store, error) { + return newStore(cfg, v6Schema(schemaCfg), storage) + }, + }) } if schemaCfg.V7SchemaFrom.IsSet() { - store, err := newStore(cfg, v7Schema(schemaCfg), storage) - if err != nil { + opts = append(opts, SchemaOpt{ + From: schemaCfg.V7SchemaFrom.Time, + NewStore: func(storage StorageClient) (Store, error) { + return newStore(cfg, v7Schema(schemaCfg), storage) + }, + }) + } + + if schemaCfg.V8SchemaFrom.IsSet() { + opts = append(opts, SchemaOpt{ + From: schemaCfg.V8SchemaFrom.Time, + NewStore: func(storage StorageClient) (Store, error) { + return newStore(cfg, v8Schema(schemaCfg), storage) + }, + }) + } + + if schemaCfg.V9SchemaFrom.IsSet() { + opts = append(opts, SchemaOpt{ + From: schemaCfg.V9SchemaFrom.Time, + NewStore: func(storage StorageClient) (Store, error) { + return newSeriesStore(cfg, v9Schema(schemaCfg), storage) + }, + }) + } + + return opts +} + +// StorageOpt stores when a StorageClient is to be used. +type StorageOpt struct { + From model.Time + Client StorageClient +} + +func latest(a, b model.Time) model.Time { + if a.Before(b) { + return b + } + return a +} + +// NewStore creates a new Store which delegates to different stores depending +// on time. +func NewStore(cfg StoreConfig, schemaCfg SchemaConfig, storageOpts []StorageOpt) (Store, error) { + schemaOpts := SchemaOpts(cfg, schemaCfg) + + return newCompositeStore(cfg, schemaCfg, schemaOpts, storageOpts) +} + +func newCompositeStore(cfg StoreConfig, schemaCfg SchemaConfig, schemaOpts []SchemaOpt, storageOpts []StorageOpt) (Store, error) { + stores := []compositeStoreEntry{} + add := func(i, j int) error { + schemaOpt := schemaOpts[i] + storageOpt := storageOpts[j] + store, err := schemaOpt.NewStore(storageOpt.Client) + stores = append(stores, compositeStoreEntry{latest(schemaOpt.From, storageOpt.From), store}) + return err + } + + i, j := 0, 0 + for i+1 < len(schemaOpts) && j+1 < len(storageOpts) { + if err := add(i, j); err != nil { return nil, err } - stores = append(stores, compositeStoreEntry{schemaCfg.V7SchemaFrom.Time, store}) + + // Increment the interval that finished first. + nextSchemaOpt := schemaOpts[i+1] + nextStorageOpt := storageOpts[j+1] + if nextSchemaOpt.From.Before(nextStorageOpt.From) { + i++ + } else if nextSchemaOpt.From.After(nextStorageOpt.From) { + j++ + } else { + i++ + j++ + } } - if schemaCfg.V8SchemaFrom.IsSet() { - store, err := newStore(cfg, v8Schema(schemaCfg), storage) - if err != nil { + for ; i+1 < len(schemaOpts); i++ { + if err := add(i, j); err != nil { return nil, err } - stores = append(stores, compositeStoreEntry{schemaCfg.V8SchemaFrom.Time, store}) } - if schemaCfg.V9SchemaFrom.IsSet() { - store, err := newSeriesStore(cfg, v9Schema(schemaCfg), storage) - if err != nil { + for ; j+1 < len(storageOpts); j++ { + if err := add(i, j); err != nil { return nil, err } - stores = append(stores, compositeStoreEntry{schemaCfg.V9SchemaFrom.Time, store}) } - if !sort.IsSorted(byStart(stores)) { - return nil, fmt.Errorf("schemas not in time-sorted order") + if err := add(i, j); err != nil { + return nil, err } return compositeStore{stores}, nil diff --git a/pkg/chunk/composite_store_test.go b/pkg/chunk/composite_store_test.go index 3bb1cb2cb65..947a5003627 100644 --- a/pkg/chunk/composite_store_test.go +++ b/pkg/chunk/composite_store_test.go @@ -4,10 +4,13 @@ import ( "context" "fmt" "reflect" + "strconv" "testing" "github.com/prometheus/common/model" "github.com/prometheus/prometheus/pkg/labels" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "github.com/weaveworks/common/test" ) @@ -169,3 +172,160 @@ func TestCompositeStore(t *testing.T) { }) } } + +type dummy struct { + version int + + // Include nil-implementations of these interfaces so I don't have to stub out + // the methods. + StorageClient + Store +} + +func dummySchema(from model.Time, version int) SchemaOpt { + return SchemaOpt{ + From: from, + NewStore: func(s StorageClient) (Store, error) { + return dummy{ + version: version, + StorageClient: s, + }, nil + }, + } +} + +func TestNewStoreTimeConvergence(t *testing.T) { + oldClient := dummy{version: -1} + newClient := dummy{version: -2} + newerClient := dummy{version: -3} + + type expectation struct { + time model.Time + schema int + client StorageClient + } + + for i, testcase := range []struct { + schemaOpts []SchemaOpt + storageOpts []StorageOpt + + expected []expectation + }{ + { + schemaOpts: []SchemaOpt{ + dummySchema(0, 1), + }, + storageOpts: []StorageOpt{ + {0, oldClient}, + }, + expected: []expectation{ + {0, 1, oldClient}, + }, + }, + { + schemaOpts: []SchemaOpt{ + dummySchema(0, 1), + dummySchema(10, 2), + }, + storageOpts: []StorageOpt{ + {0, oldClient}, + }, + expected: []expectation{ + {0, 1, oldClient}, + {10, 2, oldClient}, + }, + }, + { + schemaOpts: []SchemaOpt{ + dummySchema(0, 1), + }, + storageOpts: []StorageOpt{ + {0, oldClient}, + {10, newClient}, + }, + expected: []expectation{ + {0, 1, oldClient}, + {10, 1, newClient}, + }, + }, + { + schemaOpts: []SchemaOpt{ + dummySchema(0, 1), + dummySchema(10, 2), + }, + storageOpts: []StorageOpt{ + {0, oldClient}, + {10, newClient}, + }, + expected: []expectation{ + {0, 1, oldClient}, + {10, 2, newClient}, + }, + }, + { + schemaOpts: []SchemaOpt{ + dummySchema(0, 1), + dummySchema(20, 2), + }, + storageOpts: []StorageOpt{ + {0, oldClient}, + {10, newClient}, + {30, newerClient}, + }, + expected: []expectation{ + {0, 1, oldClient}, + {10, 1, newClient}, + {20, 2, newClient}, + {30, 2, newerClient}, + }, + }, + { + schemaOpts: []SchemaOpt{ + dummySchema(0, 1), + dummySchema(10, 2), + dummySchema(30, 3), + }, + storageOpts: []StorageOpt{ + {0, oldClient}, + {20, newClient}, + }, + expected: []expectation{ + {0, 1, oldClient}, + {10, 2, oldClient}, + {20, 2, newClient}, + {30, 3, newClient}, + }, + }, + { + schemaOpts: []SchemaOpt{ + dummySchema(0, 1), + dummySchema(10, 2), + dummySchema(20, 3), + dummySchema(40, 4), + }, + storageOpts: []StorageOpt{ + {0, oldClient}, + {20, newClient}, + }, + expected: []expectation{ + {0, 1, oldClient}, + {10, 2, oldClient}, + {20, 3, newClient}, + {40, 4, newClient}, + }, + }, + } { + t.Run(strconv.Itoa(i), func(t *testing.T) { + store, err := newCompositeStore(StoreConfig{}, SchemaConfig{}, testcase.schemaOpts, testcase.storageOpts) + require.NoError(t, err) + cs := store.(compositeStore) + require.Equal(t, len(testcase.expected), len(cs.stores)) + + for i, store := range cs.stores { + assert.Equal(t, testcase.expected[i].time, store.start, "%d", i) + assert.Equal(t, testcase.expected[i].schema, store.Store.(dummy).version, "%d", i) + assert.Equal(t, testcase.expected[i].client, store.Store.(dummy).StorageClient, "%d", i) + } + }) + } +} diff --git a/pkg/chunk/gcp/fixtures.go b/pkg/chunk/gcp/fixtures.go index 80e3fe8deb5..d8339b37931 100644 --- a/pkg/chunk/gcp/fixtures.go +++ b/pkg/chunk/gcp/fixtures.go @@ -22,6 +22,8 @@ const ( type fixture struct { srv *bttest.Server name string + + columnKeyClient bool } func (f *fixture) Name() string { @@ -60,13 +62,16 @@ func (f *fixture) Clients() ( Prefix: "chunks", }, } - sClient = &storageClient{ - schemaCfg: schemaConfig, - client: client, - } tClient = &tableClient{ client: adminClient, } + + if f.columnKeyClient { + sClient = newStorageClientColumnKey(Config{}, client, schemaConfig) + } else { + sClient = newStorageClientV1(Config{}, client, schemaConfig) + } + return } @@ -78,6 +83,10 @@ func (f *fixture) Teardown() error { // Fixtures for unit testing GCP storage. var Fixtures = []testutils.Fixture{ &fixture{ - name: "GCP", + name: "GCP-ColumnKey", + columnKeyClient: true, + }, + &fixture{ + name: "GCPv1", }, } diff --git a/pkg/chunk/gcp/storage_client.go b/pkg/chunk/gcp/storage_client.go index b2d256b760a..18b779c2e9f 100644 --- a/pkg/chunk/gcp/storage_client.go +++ b/pkg/chunk/gcp/storage_client.go @@ -27,6 +27,8 @@ const ( type Config struct { project string instance string + + ColumnKey bool } // RegisterFlags adds the flags required to config this to the given FlagSet @@ -35,34 +37,86 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { f.StringVar(&cfg.instance, "bigtable.instance", "", "Bigtable instance ID.") } -// storageClient implements chunk.storageClient for GCP. -type storageClient struct { +// NewStorageClient returns a new StorageClient. +func NewStorageClient(ctx context.Context, cfg Config, schemaCfg chunk.SchemaConfig) (chunk.StorageClient, error) { + if cfg.ColumnKey { + return NewStorageClientColumnKey(ctx, cfg, schemaCfg) + } + return NewStorageClientV1(ctx, cfg, schemaCfg) +} + +// storageClientColumnKey implements chunk.storageClient for GCP. +type storageClientColumnKey struct { cfg Config schemaCfg chunk.SchemaConfig client *bigtable.Client + keysFn keysFn } -// NewStorageClient returns a new StorageClient. -func NewStorageClient(ctx context.Context, cfg Config, schemaCfg chunk.SchemaConfig) (chunk.StorageClient, error) { +// storageClientV1 implements chunk.storageClient for GCP. +type storageClientV1 struct { + storageClientColumnKey +} + +// NewStorageClientV1 returns a new v1 StorageClient. +func NewStorageClientV1(ctx context.Context, cfg Config, schemaCfg chunk.SchemaConfig) (chunk.StorageClient, error) { client, err := bigtable.NewClient(ctx, cfg.project, cfg.instance, instrumentation()...) if err != nil { return nil, err } - return &storageClient{ + return newStorageClientV1(cfg, client, schemaCfg), nil +} + +func newStorageClientV1(cfg Config, client *bigtable.Client, schemaCfg chunk.SchemaConfig) *storageClientV1 { + return &storageClientV1{ + storageClientColumnKey{ + cfg: cfg, + schemaCfg: schemaCfg, + client: client, + keysFn: func(hashValue string, rangeValue []byte) (string, string) { + rowKey := hashValue + separator + string(rangeValue) + return rowKey, column + }, + }, + } +} + +// NewStorageClientColumnKey returns a new v2 StorageClient. +func NewStorageClientColumnKey(ctx context.Context, cfg Config, schemaCfg chunk.SchemaConfig) (chunk.StorageClient, error) { + client, err := bigtable.NewClient(ctx, cfg.project, cfg.instance, instrumentation()...) + if err != nil { + return nil, err + } + + return newStorageClientColumnKey(cfg, client, schemaCfg), nil +} + +func newStorageClientColumnKey(cfg Config, client *bigtable.Client, schemaCfg chunk.SchemaConfig) *storageClientColumnKey { + return &storageClientColumnKey{ cfg: cfg, schemaCfg: schemaCfg, client: client, - }, nil + keysFn: func(hashValue string, rangeValue []byte) (string, string) { + // We could hash the row key for better distribution but we decided against it + // because that would make migrations very, very hard. + return hashValue, string(rangeValue) + }, + } } -func (s *storageClient) NewWriteBatch() chunk.WriteBatch { +func (s *storageClientColumnKey) NewWriteBatch() chunk.WriteBatch { return bigtableWriteBatch{ tables: map[string]map[string]*bigtable.Mutation{}, + keysFn: s.keysFn, } } +// keysFn returns the row and column keys for the given hash and range keys. +type keysFn func(hashValue string, rangeValue []byte) (rowKey, columnKey string) + type bigtableWriteBatch struct { tables map[string]map[string]*bigtable.Mutation + keysFn keysFn } func (b bigtableWriteBatch) Add(tableName, hashValue string, rangeValue []byte, value []byte) { @@ -72,19 +126,17 @@ func (b bigtableWriteBatch) Add(tableName, hashValue string, rangeValue []byte, b.tables[tableName] = rows } - // TODO the hashValue should actually be hashed - but I have data written in - // this format, so we need to do a proper migration. - rowKey := hashValue + separator + string(rangeValue) + rowKey, columnKey := b.keysFn(hashValue, rangeValue) mutation, ok := rows[rowKey] if !ok { mutation = bigtable.NewMutation() rows[rowKey] = mutation } - mutation.Set(columnFamily, column, 0, value) + mutation.Set(columnFamily, columnKey, 0, value) } -func (s *storageClient) BatchWrite(ctx context.Context, batch chunk.WriteBatch) error { +func (s *storageClientColumnKey) BatchWrite(ctx context.Context, batch chunk.WriteBatch) error { bigtableBatch := batch.(bigtableWriteBatch) for tableName, rows := range bigtableBatch.tables { @@ -110,75 +162,74 @@ func (s *storageClient) BatchWrite(ctx context.Context, batch chunk.WriteBatch) return nil } -func (s *storageClient) QueryPages(ctx context.Context, query chunk.IndexQuery, callback func(result chunk.ReadBatch) (shouldContinue bool)) error { +func (s *storageClientColumnKey) QueryPages(ctx context.Context, query chunk.IndexQuery, callback func(result chunk.ReadBatch) (shouldContinue bool)) error { + const null = string('\xff') + sp, ctx := ot.StartSpanFromContext(ctx, "QueryPages", ot.Tag{Key: "tableName", Value: query.TableName}, ot.Tag{Key: "hashValue", Value: query.HashValue}) defer sp.Finish() table := s.client.Open(query.TableName) - var rowRange bigtable.RowRange - - /* BigTable only seems to support regex match on cell values, so doing it - client side for now - readOpts := []bigtable.ReadOption{ + rOpts := []bigtable.ReadOption{ bigtable.RowFilter(bigtable.FamilyFilter(columnFamily)), } - if query.ValueEqual != nil { - readOpts = append(readOpts, bigtable.RowFilter(bigtable.ValueFilter(string(query.ValueEqual)))) - } - */ if len(query.RangeValuePrefix) > 0 { - rowRange = bigtable.PrefixRange(query.HashValue + separator + string(query.RangeValuePrefix)) + rOpts = append(rOpts, bigtable.RowFilter(bigtable.ColumnRangeFilter(columnFamily, string(query.RangeValuePrefix), string(query.RangeValuePrefix)+null))) } else if len(query.RangeValueStart) > 0 { - rowRange = bigtable.NewRange(query.HashValue+separator+string(query.RangeValueStart), query.HashValue+separator+string('\xff')) - } else { - rowRange = bigtable.PrefixRange(query.HashValue + separator) + rOpts = append(rOpts, bigtable.RowFilter(bigtable.ColumnRangeFilter(columnFamily, string(query.RangeValueStart), null))) } - err := table.ReadRows(ctx, rowRange, func(r bigtable.Row) bool { - if query.ValueEqual == nil || bytes.Equal(r[columnFamily][0].Value, query.ValueEqual) { - return callback(bigtableReadBatch(r)) - } - return true - }) + r, err := table.ReadRow(ctx, query.HashValue, rOpts...) if err != nil { sp.LogFields(otlog.String("error", err.Error())) return errors.WithStack(err) } + + val, ok := r[columnFamily] + if !ok { + // There are no matching rows. + return nil + } + + if query.ValueEqual != nil { + filteredItems := make([]bigtable.ReadItem, 0, len(val)) + for _, item := range val { + if bytes.Equal(query.ValueEqual, item.Value) { + filteredItems = append(filteredItems, item) + } + } + + val = filteredItems + } + callback(bigtableReadBatchColumnKey{ + items: val, + columnPrefix: columnFamily + ":", + }) return nil } -// bigtableReadBatch represents a batch of rows read from Bigtable. As the -// bigtable interface gives us rows one-by-one, a batch always only contains -// a single row. -type bigtableReadBatch bigtable.Row +// bigtableReadBatchColumnKey represents a batch of values read from Bigtable. +type bigtableReadBatchColumnKey struct { + items []bigtable.ReadItem + columnPrefix string +} -func (bigtableReadBatch) Len() int { - return 1 +func (b bigtableReadBatchColumnKey) Len() int { + return len(b.items) } -func (b bigtableReadBatch) RangeValue(index int) []byte { - if index != 0 { - panic("index != 0") - } - // String before the first separator is the hashkey - parts := strings.SplitN(bigtable.Row(b).Key(), separator, 2) - return []byte(parts[1]) +func (b bigtableReadBatchColumnKey) RangeValue(index int) []byte { + return []byte( + strings.TrimPrefix(b.items[index].Column, b.columnPrefix), + ) } -func (b bigtableReadBatch) Value(index int) []byte { - if index != 0 { - panic("index != 0") - } - cf, ok := b[columnFamily] - if !ok || len(cf) != 1 { - panic("bad response from bigtable") - } - return cf[0].Value +func (b bigtableReadBatchColumnKey) Value(index int) []byte { + return b.items[index].Value } -func (s *storageClient) PutChunks(ctx context.Context, chunks []chunk.Chunk) error { +func (s *storageClientColumnKey) PutChunks(ctx context.Context, chunks []chunk.Chunk) error { keys := map[string][]string{} muts := map[string][]*bigtable.Mutation{} @@ -212,7 +263,7 @@ func (s *storageClient) PutChunks(ctx context.Context, chunks []chunk.Chunk) err return nil } -func (s *storageClient) GetChunks(ctx context.Context, input []chunk.Chunk) ([]chunk.Chunk, error) { +func (s *storageClientColumnKey) GetChunks(ctx context.Context, input []chunk.Chunk) ([]chunk.Chunk, error) { sp, ctx := ot.StartSpanFromContext(ctx, "GetChunks") defer sp.Finish() sp.LogFields(otlog.Int("chunks requested", len(input))) @@ -291,3 +342,72 @@ func (s *storageClient) GetChunks(ctx context.Context, input []chunk.Chunk) ([]c return output, nil } + +func (s *storageClientV1) QueryPages(ctx context.Context, query chunk.IndexQuery, callback func(result chunk.ReadBatch) (shouldContinue bool)) error { + const null = string('\xff') + + sp, ctx := ot.StartSpanFromContext(ctx, "QueryPages", ot.Tag{Key: "tableName", Value: query.TableName}, ot.Tag{Key: "hashValue", Value: query.HashValue}) + defer sp.Finish() + + table := s.client.Open(query.TableName) + + var rowRange bigtable.RowRange + + /* BigTable only seems to support regex match on cell values, so doing it + client side for now + readOpts := []bigtable.ReadOption{ + bigtable.RowFilter(bigtable.FamilyFilter(columnFamily)), + } + if query.ValueEqual != nil { + readOpts = append(readOpts, bigtable.RowFilter(bigtable.ValueFilter(string(query.ValueEqual)))) + } + */ + + if len(query.RangeValuePrefix) > 0 { + rowRange = bigtable.PrefixRange(query.HashValue + separator + string(query.RangeValuePrefix)) + } else if len(query.RangeValueStart) > 0 { + rowRange = bigtable.NewRange(query.HashValue+separator+string(query.RangeValueStart), query.HashValue+separator+null) + } else { + rowRange = bigtable.PrefixRange(query.HashValue + separator) + } + + err := table.ReadRows(ctx, rowRange, func(r bigtable.Row) bool { + if query.ValueEqual == nil || bytes.Equal(r[columnFamily][0].Value, query.ValueEqual) { + return callback(bigtableReadBatchV1(r)) + } + + return true + }) + if err != nil { + sp.LogFields(otlog.String("error", err.Error())) + return errors.WithStack(err) + } + return nil +} + +// bigtableReadBatchV1 represents a batch of rows read from Bigtable. As the +// bigtable interface gives us rows one-by-one, a batch always only contains +// a single row. +type bigtableReadBatchV1 bigtable.Row + +func (bigtableReadBatchV1) Len() int { + return 1 +} +func (b bigtableReadBatchV1) RangeValue(index int) []byte { + if index != 0 { + panic("index != 0") + } + // String before the first separator is the hashkey + parts := strings.SplitN(bigtable.Row(b).Key(), separator, 2) + return []byte(parts[1]) +} +func (b bigtableReadBatchV1) Value(index int) []byte { + if index != 0 { + panic("index != 0") + } + cf, ok := b[columnFamily] + if !ok || len(cf) != 1 { + panic("bad response from bigtable") + } + return cf[0].Value +} diff --git a/pkg/chunk/schema_config.go b/pkg/chunk/schema_config.go index 3d777cf5832..0cf2993cb12 100644 --- a/pkg/chunk/schema_config.go +++ b/pkg/chunk/schema_config.go @@ -23,14 +23,15 @@ const ( type SchemaConfig struct { // After midnight on this day, we start bucketing indexes by day instead of by // hour. Only the day matters, not the time within the day. - DailyBucketsFrom util.DayValue - Base64ValuesFrom util.DayValue - V4SchemaFrom util.DayValue - V5SchemaFrom util.DayValue - V6SchemaFrom util.DayValue - V7SchemaFrom util.DayValue - V8SchemaFrom util.DayValue - V9SchemaFrom util.DayValue + DailyBucketsFrom util.DayValue + Base64ValuesFrom util.DayValue + V4SchemaFrom util.DayValue + V5SchemaFrom util.DayValue + V6SchemaFrom util.DayValue + V7SchemaFrom util.DayValue + V8SchemaFrom util.DayValue + V9SchemaFrom util.DayValue + BigtableColumnKeyFrom util.DayValue // Master 'off-switch' for table capacity updates, e.g. when troubleshooting ThroughputUpdatesDisabled bool @@ -61,6 +62,7 @@ func (cfg *SchemaConfig) RegisterFlags(f *flag.FlagSet) { f.Var(&cfg.V7SchemaFrom, "dynamodb.v7-schema-from", "The date (in the format YYYY-MM-DD) after which we enable v7 schema (Deprecated).") f.Var(&cfg.V8SchemaFrom, "dynamodb.v8-schema-from", "The date (in the format YYYY-MM-DD) after which we enable v8 schema (Deprecated).") f.Var(&cfg.V9SchemaFrom, "dynamodb.v9-schema-from", "The date (in the format YYYY-MM-DD) after which we enable v9 schema (Series indexing).") + f.Var(&cfg.BigtableColumnKeyFrom, "bigtable.column-key-from", "The date (in the format YYYY-MM-DD) after which we use bigtable column keys.") f.BoolVar(&cfg.ThroughputUpdatesDisabled, "table-manager.throughput-updates-disabled", false, "If true, disable all changes to DB capacity") f.DurationVar(&cfg.DynamoDBPollInterval, "dynamodb.poll-interval", 2*time.Minute, "How frequently to poll DynamoDB to learn our capacity.") diff --git a/pkg/chunk/storage/factory.go b/pkg/chunk/storage/factory.go index eddd7f2055d..f1771fe0fc8 100644 --- a/pkg/chunk/storage/factory.go +++ b/pkg/chunk/storage/factory.go @@ -8,6 +8,8 @@ import ( "time" "github.com/go-kit/kit/log/level" + "github.com/pkg/errors" + "github.com/prometheus/common/model" "github.com/weaveworks/cortex/pkg/chunk" "github.com/weaveworks/cortex/pkg/chunk/aws" "github.com/weaveworks/cortex/pkg/chunk/cassandra" @@ -37,8 +39,31 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { f.DurationVar(&cfg.IndexCacheValidity, "store.index-cache-validity", 5*time.Minute, "Period for which entries in the index cache are valid. Should be no higher than -ingester.max-chunk-idle.") } -// NewStorageClient makes a storage client based on the configuration. -func NewStorageClient(cfg Config, schemaCfg chunk.SchemaConfig) (client chunk.StorageClient, err error) { +// Opts makes the storage clients based on the configuration. +func Opts(cfg Config, schemaCfg chunk.SchemaConfig) ([]chunk.StorageOpt, error) { + opts := []chunk.StorageOpt{} + client, err := newStorageClient(cfg, schemaCfg) + if err != nil { + return nil, errors.Wrap(err, "error creating storage client") + } + + opts = append(opts, chunk.StorageOpt{From: model.Time(0), Client: client}) + if cfg.StorageClient == "gcp" && schemaCfg.BigtableColumnKeyFrom.IsSet() { + client, err = gcp.NewStorageClientColumnKey(context.Background(), cfg.GCPStorageConfig, schemaCfg) + if err != nil { + return nil, errors.Wrap(err, "error creating storage client") + } + + opts = append(opts, chunk.StorageOpt{ + From: schemaCfg.BigtableColumnKeyFrom.Time, + Client: newCachingStorageClient(client, cfg.IndexCacheSize, cfg.IndexCacheValidity), + }) + } + + return opts, nil +} + +func newStorageClient(cfg Config, schemaCfg chunk.SchemaConfig) (client chunk.StorageClient, err error) { switch cfg.StorageClient { case "inmemory": client, err = chunk.NewMockStorage(), nil