Skip to content
4 changes: 2 additions & 2 deletions cmd/ingester/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,13 +63,13 @@ func main() {
}
defer server.Shutdown()

storageClient, err := storage.NewStorageClient(storageConfig, schemaConfig)
storageOpts, err := storage.Opts(storageConfig, schemaConfig)
if err != nil {
level.Error(util.Logger).Log("msg", "error initializing storage client", "err", err)
os.Exit(1)
}

chunkStore, err := chunk.NewStore(chunkStoreConfig, schemaConfig, storageClient)
chunkStore, err := chunk.NewStore(chunkStoreConfig, schemaConfig, storageOpts)
if err != nil {
level.Error(util.Logger).Log("err", err)
os.Exit(1)
Expand Down
4 changes: 2 additions & 2 deletions cmd/lite/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,13 +71,13 @@ func main() {
}
defer server.Shutdown()

storageClient, err := storage.NewStorageClient(storageConfig, schemaConfig)
storageOpts, err := storage.Opts(storageConfig, schemaConfig)
if err != nil {
level.Error(util.Logger).Log("msg", "error initializing storage client", "err", err)
os.Exit(1)
}

chunkStore, err := chunk.NewStore(chunkStoreConfig, schemaConfig, storageClient)
chunkStore, err := chunk.NewStore(chunkStoreConfig, schemaConfig, storageOpts)
if err != nil {
level.Error(util.Logger).Log("err", err)
os.Exit(1)
Expand Down
4 changes: 2 additions & 2 deletions cmd/querier/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,13 +76,13 @@ func main() {
defer server.Shutdown()
server.HTTP.Handle("/ring", r)

storageClient, err := storage.NewStorageClient(storageConfig, schemaConfig)
storageOpts, err := storage.Opts(storageConfig, schemaConfig)
if err != nil {
level.Error(util.Logger).Log("msg", "error initializing storage client", "err", err)
os.Exit(1)
}

chunkStore, err := chunk.NewStore(chunkStoreConfig, schemaConfig, storageClient)
chunkStore, err := chunk.NewStore(chunkStoreConfig, schemaConfig, storageOpts)
if err != nil {
level.Error(util.Logger).Log("err", err)
os.Exit(1)
Expand Down
5 changes: 2 additions & 3 deletions cmd/ruler/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,13 +50,12 @@ func main() {

util.InitLogger(&serverConfig)

storageClient, err := storage.NewStorageClient(storageConfig, schemaConfig)
storageOpts, err := storage.Opts(storageConfig, schemaConfig)
if err != nil {
level.Error(util.Logger).Log("msg", "error initializing storage client", "err", err)
os.Exit(1)
}

chunkStore, err := chunk.NewStore(chunkStoreConfig, schemaConfig, storageClient)
chunkStore, err := chunk.NewStore(chunkStoreConfig, schemaConfig, storageOpts)
if err != nil {
level.Error(util.Logger).Log("err", err)
os.Exit(1)
Expand Down
173 changes: 124 additions & 49 deletions pkg/chunk/composite_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package chunk

import (
"context"
"fmt"
"sort"

"github.com/prometheus/common/model"
Expand Down Expand Up @@ -34,84 +33,160 @@ func (a byStart) Len() int { return len(a) }
func (a byStart) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a byStart) Less(i, j int) bool { return a[i].start < a[j].start }

// NewStore creates a new Store which delegates to different stores depending
// on time.
func NewStore(cfg StoreConfig, schemaCfg SchemaConfig, storage StorageClient) (Store, error) {
store, err := newStore(cfg, v1Schema(schemaCfg), storage)
if err != nil {
return nil, err
}
// SchemaOpt stores when a schema starts.
type SchemaOpt struct {
From model.Time
NewStore func(StorageClient) (Store, error)
}

stores := []compositeStoreEntry{
{0, store},
}
// SchemaOpts returns the schemas and the times when they activate.
func SchemaOpts(cfg StoreConfig, schemaCfg SchemaConfig) []SchemaOpt {
opts := []SchemaOpt{{
From: 0,
NewStore: func(storage StorageClient) (Store, error) {
return newStore(cfg, v1Schema(schemaCfg), storage)
},
}}

if schemaCfg.DailyBucketsFrom.IsSet() {
store, err := newStore(cfg, v2Schema(schemaCfg), storage)
if err != nil {
return nil, err
}
stores = append(stores, compositeStoreEntry{schemaCfg.DailyBucketsFrom.Time, store})
opts = append(opts, SchemaOpt{
From: schemaCfg.DailyBucketsFrom.Time,
NewStore: func(storage StorageClient) (Store, error) {
return newStore(cfg, v2Schema(schemaCfg), storage)
},
})
}

if schemaCfg.Base64ValuesFrom.IsSet() {
store, err := newStore(cfg, v3Schema(schemaCfg), storage)
if err != nil {
return nil, err
}
stores = append(stores, compositeStoreEntry{schemaCfg.Base64ValuesFrom.Time, store})
opts = append(opts, SchemaOpt{
From: schemaCfg.Base64ValuesFrom.Time,
NewStore: func(storage StorageClient) (Store, error) {
return newStore(cfg, v3Schema(schemaCfg), storage)
},
})
}

if schemaCfg.V4SchemaFrom.IsSet() {
store, err := newStore(cfg, v4Schema(schemaCfg), storage)
if err != nil {
return nil, err
}
stores = append(stores, compositeStoreEntry{schemaCfg.V4SchemaFrom.Time, store})
opts = append(opts, SchemaOpt{
From: schemaCfg.V4SchemaFrom.Time,
NewStore: func(storage StorageClient) (Store, error) {
return newStore(cfg, v4Schema(schemaCfg), storage)
},
})
}

if schemaCfg.V5SchemaFrom.IsSet() {
store, err := newStore(cfg, v5Schema(schemaCfg), storage)
if err != nil {
return nil, err
}
stores = append(stores, compositeStoreEntry{schemaCfg.V5SchemaFrom.Time, store})
opts = append(opts, SchemaOpt{
From: schemaCfg.V5SchemaFrom.Time,
NewStore: func(storage StorageClient) (Store, error) {
return newStore(cfg, v5Schema(schemaCfg), storage)
},
})
}

if schemaCfg.V6SchemaFrom.IsSet() {
store, err := newStore(cfg, v6Schema(schemaCfg), storage)
if err != nil {
return nil, err
}
stores = append(stores, compositeStoreEntry{schemaCfg.V6SchemaFrom.Time, store})
opts = append(opts, SchemaOpt{
From: schemaCfg.V6SchemaFrom.Time,
NewStore: func(storage StorageClient) (Store, error) {
return newStore(cfg, v6Schema(schemaCfg), storage)
},
})
}

if schemaCfg.V7SchemaFrom.IsSet() {
store, err := newStore(cfg, v7Schema(schemaCfg), storage)
if err != nil {
opts = append(opts, SchemaOpt{
From: schemaCfg.V7SchemaFrom.Time,
NewStore: func(storage StorageClient) (Store, error) {
return newStore(cfg, v7Schema(schemaCfg), storage)
},
})
}

if schemaCfg.V8SchemaFrom.IsSet() {
opts = append(opts, SchemaOpt{
From: schemaCfg.V8SchemaFrom.Time,
NewStore: func(storage StorageClient) (Store, error) {
return newStore(cfg, v8Schema(schemaCfg), storage)
},
})
}

if schemaCfg.V9SchemaFrom.IsSet() {
opts = append(opts, SchemaOpt{
From: schemaCfg.V9SchemaFrom.Time,
NewStore: func(storage StorageClient) (Store, error) {
return newSeriesStore(cfg, v9Schema(schemaCfg), storage)
},
})
}

return opts
}

// StorageOpt stores when a StorageClient is to be used.
type StorageOpt struct {
From model.Time
Client StorageClient
}

func latest(a, b model.Time) model.Time {
if a.Before(b) {
return b
}
return a
}

// NewStore creates a new Store which delegates to different stores depending
// on time.
func NewStore(cfg StoreConfig, schemaCfg SchemaConfig, storageOpts []StorageOpt) (Store, error) {
schemaOpts := SchemaOpts(cfg, schemaCfg)

return newCompositeStore(cfg, schemaCfg, schemaOpts, storageOpts)
}

func newCompositeStore(cfg StoreConfig, schemaCfg SchemaConfig, schemaOpts []SchemaOpt, storageOpts []StorageOpt) (Store, error) {
stores := []compositeStoreEntry{}
add := func(i, j int) error {
schemaOpt := schemaOpts[i]
storageOpt := storageOpts[j]
store, err := schemaOpt.NewStore(storageOpt.Client)
stores = append(stores, compositeStoreEntry{latest(schemaOpt.From, storageOpt.From), store})
return err
}

i, j := 0, 0
for i+1 < len(schemaOpts) && j+1 < len(storageOpts) {
if err := add(i, j); err != nil {
return nil, err
}
stores = append(stores, compositeStoreEntry{schemaCfg.V7SchemaFrom.Time, store})

// Increment the interval that finished first.
nextSchemaOpt := schemaOpts[i+1]
nextStorageOpt := storageOpts[j+1]
if nextSchemaOpt.From.Before(nextStorageOpt.From) {
i++
} else if nextSchemaOpt.From.After(nextStorageOpt.From) {
j++
} else {
i++
j++
}
}

if schemaCfg.V8SchemaFrom.IsSet() {
store, err := newStore(cfg, v8Schema(schemaCfg), storage)
if err != nil {
for ; i+1 < len(schemaOpts); i++ {
if err := add(i, j); err != nil {
return nil, err
}
stores = append(stores, compositeStoreEntry{schemaCfg.V8SchemaFrom.Time, store})
}

if schemaCfg.V9SchemaFrom.IsSet() {
store, err := newSeriesStore(cfg, v9Schema(schemaCfg), storage)
if err != nil {
for ; j+1 < len(storageOpts); j++ {
if err := add(i, j); err != nil {
return nil, err
}
stores = append(stores, compositeStoreEntry{schemaCfg.V9SchemaFrom.Time, store})
}

if !sort.IsSorted(byStart(stores)) {
return nil, fmt.Errorf("schemas not in time-sorted order")
if err := add(i, j); err != nil {
return nil, err
}

return compositeStore{stores}, nil
Expand Down
Loading