From a6905813fd126012ace09cefc80586756e5a630b Mon Sep 17 00:00:00 2001 From: Tom Wilkie Date: Thu, 12 Jul 2018 10:55:51 +0100 Subject: [PATCH 1/6] Move compositeSchema abstraction up to componsiteStore. This will allow us to vary the store implementation over time, and not just the schema. This will unblock the new bigtable storage adapter (using columns instead of rows), and allow us to more easily implement the iterative intersections and indexing of series instead of chunks. Signed-off-by: Tom Wilkie --- cmd/ingester/main.go | 2 +- cmd/lite/main.go | 2 +- cmd/querier/main.go | 2 +- cmd/ruler/main.go | 2 +- pkg/chunk/chunk_store.go | 49 +++----- pkg/chunk/chunk_store_test.go | 198 ++++++++++++----------------- pkg/chunk/composite_store.go | 194 +++++++++++++++++++++++++++++ pkg/chunk/composite_store_test.go | 167 +++++++++++++++++++++++++ pkg/chunk/schema_config.go | 200 ------------------------------ pkg/chunk/schema_config_test.go | 145 ---------------------- pkg/chunk/schema_test.go | 104 ---------------- 11 files changed, 463 insertions(+), 602 deletions(-) create mode 100644 pkg/chunk/composite_store.go create mode 100644 pkg/chunk/composite_store_test.go diff --git a/cmd/ingester/main.go b/cmd/ingester/main.go index 23b7a51960e..fc70dfbd1db 100644 --- a/cmd/ingester/main.go +++ b/cmd/ingester/main.go @@ -70,7 +70,7 @@ func main() { os.Exit(1) } - chunkStore, err := chunk.NewStore(chunkStoreConfig, schemaConfig, storageClient) + chunkStore, err := chunk.NewCompositeStore(chunkStoreConfig, schemaConfig, storageClient) if err != nil { level.Error(util.Logger).Log("err", err) os.Exit(1) diff --git a/cmd/lite/main.go b/cmd/lite/main.go index 096ee7f25d7..ec161e2c87a 100644 --- a/cmd/lite/main.go +++ b/cmd/lite/main.go @@ -78,7 +78,7 @@ func main() { os.Exit(1) } - chunkStore, err := chunk.NewStore(chunkStoreConfig, schemaConfig, storageClient) + chunkStore, err := chunk.NewCompositeStore(chunkStoreConfig, schemaConfig, storageClient) if err != nil { level.Error(util.Logger).Log("err", err) os.Exit(1) diff --git a/cmd/querier/main.go b/cmd/querier/main.go index 9429746a22b..e309107f664 100644 --- a/cmd/querier/main.go +++ b/cmd/querier/main.go @@ -80,7 +80,7 @@ func main() { os.Exit(1) } - chunkStore, err := chunk.NewStore(chunkStoreConfig, schemaConfig, storageClient) + chunkStore, err := chunk.NewCompositeStore(chunkStoreConfig, schemaConfig, storageClient) if err != nil { level.Error(util.Logger).Log("err", err) os.Exit(1) diff --git a/cmd/ruler/main.go b/cmd/ruler/main.go index f480e4c8c22..6649b10c5da 100644 --- a/cmd/ruler/main.go +++ b/cmd/ruler/main.go @@ -57,7 +57,7 @@ func main() { os.Exit(1) } - chunkStore, err := chunk.NewStore(chunkStoreConfig, schemaConfig, storageClient) + chunkStore, err := chunk.NewCompositeStore(chunkStoreConfig, schemaConfig, storageClient) if err != nil { level.Error(util.Logger).Log("err", err) os.Exit(1) diff --git a/pkg/chunk/chunk_store.go b/pkg/chunk/chunk_store.go index 24d3b1c1c20..ef850fe2966 100644 --- a/pkg/chunk/chunk_store.go +++ b/pkg/chunk/chunk_store.go @@ -57,9 +57,6 @@ type StoreConfig struct { MinChunkAge time.Duration QueryChunkLimit int - - // For injecting different schemas in tests. - schemaFactory func(cfg SchemaConfig) Schema } // RegisterFlags adds the flags required to config this to the given FlagSet @@ -69,8 +66,8 @@ func (cfg *StoreConfig) RegisterFlags(f *flag.FlagSet) { f.IntVar(&cfg.QueryChunkLimit, "store.query-chunk-limit", 2e6, "Maximum number of chunks that can be fetched in a single query.") } -// Store implements Store -type Store struct { +// store implements Store +type store struct { cfg StoreConfig storage StorageClient @@ -78,25 +75,13 @@ type Store struct { schema Schema } -// NewStore makes a new ChunkStore -func NewStore(cfg StoreConfig, schemaCfg SchemaConfig, storage StorageClient) (*Store, error) { - var schema Schema - var err error - if cfg.schemaFactory == nil { - schema, err = newCompositeSchema(schemaCfg) - } else { - schema = cfg.schemaFactory(schemaCfg) - } - if err != nil { - return nil, err - } - +func newStore(cfg StoreConfig, schema Schema, storage StorageClient) (*store, error) { cache, err := cache.New(cfg.CacheConfig) if err != nil { return nil, err } - return &Store{ + return &store{ cfg: cfg, storage: storage, schema: schema, @@ -105,12 +90,12 @@ func NewStore(cfg StoreConfig, schemaCfg SchemaConfig, storage StorageClient) (* } // Stop any background goroutines (ie in the cache.) -func (c *Store) Stop() { +func (c *store) Stop() { c.cache.Stop() } // Put implements ChunkStore -func (c *Store) Put(ctx context.Context, chunks []Chunk) error { +func (c *store) Put(ctx context.Context, chunks []Chunk) error { userID, err := user.ExtractOrgID(ctx) if err != nil { return err @@ -125,7 +110,7 @@ func (c *Store) Put(ctx context.Context, chunks []Chunk) error { return c.updateIndex(ctx, userID, chunks) } -func (c *Store) updateIndex(ctx context.Context, userID string, chunks []Chunk) error { +func (c *store) updateIndex(ctx context.Context, userID string, chunks []Chunk) error { writeReqs, err := c.calculateDynamoWrites(userID, chunks) if err != nil { return err @@ -136,7 +121,7 @@ func (c *Store) updateIndex(ctx context.Context, userID string, chunks []Chunk) // calculateDynamoWrites creates a set of batched WriteRequests to dynamo for all // the chunks it is given. -func (c *Store) calculateDynamoWrites(userID string, chunks []Chunk) (WriteBatch, error) { +func (c *store) calculateDynamoWrites(userID string, chunks []Chunk) (WriteBatch, error) { seenIndexEntries := map[string]struct{}{} writeReqs := c.storage.NewWriteBatch() @@ -195,7 +180,7 @@ func (s *spanLogger) Log(kvps ...interface{}) error { } // Get implements ChunkStore -func (c *Store) Get(ctx context.Context, from, through model.Time, allMatchers ...*labels.Matcher) ([]Chunk, error) { +func (c *store) Get(ctx context.Context, from, through model.Time, allMatchers ...*labels.Matcher) ([]Chunk, error) { log, ctx := newSpanLogger(ctx, "ChunkStore.Get") defer log.Span.Finish() @@ -234,7 +219,7 @@ func (c *Store) Get(ctx context.Context, from, through model.Time, allMatchers . return c.getSeriesChunks(ctx, from, through, matchers, metricNameMatcher) } -func (c *Store) getMetricNameChunks(ctx context.Context, from, through model.Time, allMatchers []*labels.Matcher, metricName string) ([]Chunk, error) { +func (c *store) getMetricNameChunks(ctx context.Context, from, through model.Time, allMatchers []*labels.Matcher, metricName string) ([]Chunk, error) { log, ctx := newSpanLogger(ctx, "ChunkStore.getMetricNameChunks") defer log.Finish() level.Debug(log).Log("from", from, "through", through, "metricName", metricName, "matchers", len(allMatchers)) @@ -338,7 +323,7 @@ func ProcessCacheResponse(chunks []Chunk, keys []string, bufs [][]byte) (found [ return } -func (c *Store) getSeriesChunks(ctx context.Context, from, through model.Time, allMatchers []*labels.Matcher, metricNameMatcher *labels.Matcher) ([]Chunk, error) { +func (c *store) getSeriesChunks(ctx context.Context, from, through model.Time, allMatchers []*labels.Matcher, metricNameMatcher *labels.Matcher) ([]Chunk, error) { // Get all series from the index userID, err := user.ExtractOrgID(ctx) if err != nil { @@ -402,7 +387,7 @@ outer: return chunks, nil } -func (c *Store) lookupChunksByMetricName(ctx context.Context, from, through model.Time, matchers []*labels.Matcher, metricName string) ([]Chunk, error) { +func (c *store) lookupChunksByMetricName(ctx context.Context, from, through model.Time, matchers []*labels.Matcher, metricName string) ([]Chunk, error) { log, ctx := newSpanLogger(ctx, "ChunkStore.lookupChunksByMetricName") defer log.Finish() @@ -497,7 +482,7 @@ func (c *Store) lookupChunksByMetricName(ctx context.Context, from, through mode return c.convertChunkIDsToChunks(ctx, chunkIDs) } -func (c *Store) lookupEntriesByQueries(ctx context.Context, queries []IndexQuery) ([]IndexEntry, error) { +func (c *store) lookupEntriesByQueries(ctx context.Context, queries []IndexQuery) ([]IndexEntry, error) { incomingEntries := make(chan []IndexEntry) incomingErrors := make(chan error) for _, query := range queries { @@ -526,7 +511,7 @@ func (c *Store) lookupEntriesByQueries(ctx context.Context, queries []IndexQuery return entries, lastErr } -func (c *Store) lookupEntriesByQuery(ctx context.Context, query IndexQuery) ([]IndexEntry, error) { +func (c *store) lookupEntriesByQuery(ctx context.Context, query IndexQuery) ([]IndexEntry, error) { var entries []IndexEntry if err := c.storage.QueryPages(ctx, query, func(resp ReadBatch) (shouldContinue bool) { @@ -547,7 +532,7 @@ func (c *Store) lookupEntriesByQuery(ctx context.Context, query IndexQuery) ([]I return entries, nil } -func (c *Store) parseIndexEntries(ctx context.Context, entries []IndexEntry, matcher *labels.Matcher) ([]string, error) { +func (c *store) parseIndexEntries(ctx context.Context, entries []IndexEntry, matcher *labels.Matcher) ([]string, error) { result := make([]string, 0, len(entries)) for _, entry := range entries { @@ -569,7 +554,7 @@ func (c *Store) parseIndexEntries(ctx context.Context, entries []IndexEntry, mat return result, nil } -func (c *Store) convertChunkIDsToChunks(ctx context.Context, chunkIDs []string) ([]Chunk, error) { +func (c *store) convertChunkIDsToChunks(ctx context.Context, chunkIDs []string) ([]Chunk, error) { userID, err := user.ExtractOrgID(ctx) if err != nil { return nil, err @@ -587,7 +572,7 @@ func (c *Store) convertChunkIDsToChunks(ctx context.Context, chunkIDs []string) return chunkSet, nil } -func (c *Store) writeBackCache(ctx context.Context, chunks []Chunk) error { +func (c *store) writeBackCache(ctx context.Context, chunks []Chunk) error { for i := range chunks { encoded, err := chunks[i].Encode() if err != nil { diff --git a/pkg/chunk/chunk_store_test.go b/pkg/chunk/chunk_store_test.go index 9bf523de0b3..716c5cfa538 100644 --- a/pkg/chunk/chunk_store_test.go +++ b/pkg/chunk/chunk_store_test.go @@ -14,6 +14,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/weaveworks/cortex/pkg/prom1/storage/local/chunk" + "github.com/weaveworks/cortex/pkg/util" "github.com/weaveworks/cortex/pkg/util/extract" "golang.org/x/net/context" @@ -21,15 +22,37 @@ import ( "github.com/weaveworks/common/user" ) +var schemas = []struct { + name string + fn func(cfg SchemaConfig) Schema + requireMetricName bool +}{ + {"v1 schema", v1Schema, true}, + {"v2 schema", v2Schema, true}, + {"v3 schema", v3Schema, true}, + {"v4 schema", v4Schema, true}, + {"v5 schema", v5Schema, true}, + {"v6 schema", v6Schema, true}, + {"v7 schema", v7Schema, true}, + {"v8 schema", v8Schema, false}, +} + // newTestStore creates a new Store for testing. -func newTestChunkStore(t *testing.T, cfg StoreConfig) *Store { +func newTestChunkStore(t *testing.T, schemaFn func(SchemaConfig) Schema) *store { + var ( + storeCfg StoreConfig + schemaCfg SchemaConfig + ) + util.DefaultValues(&storeCfg, &schemaCfg) + storage := NewMockStorage() - schemaCfg := SchemaConfig{} tableManager, err := NewTableManager(schemaCfg, maxChunkAge, storage) require.NoError(t, err) + err = tableManager.SyncTables(context.Background()) require.NoError(t, err) - store, err := NewStore(cfg, schemaCfg, storage) + + store, err := newStore(storeCfg, schemaFn(schemaCfg), storage) require.NoError(t, err) return store } @@ -102,21 +125,6 @@ func TestChunkStore_Get(t *testing.T) { barSampleStream2, err := createSampleStreamFrom(barChunk2) require.NoError(t, err) - schemas := []struct { - name string - fn func(cfg SchemaConfig) Schema - requireMetricName bool - }{ - {"v1 schema", v1Schema, true}, - {"v2 schema", v2Schema, true}, - {"v3 schema", v3Schema, true}, - {"v4 schema", v4Schema, true}, - {"v5 schema", v5Schema, true}, - {"v6 schema", v6Schema, true}, - {"v7 schema", v7Schema, true}, - {"v8 schema", v8Schema, false}, - } - for _, tc := range []struct { query string expect model.Matrix @@ -193,10 +201,7 @@ func TestChunkStore_Get(t *testing.T) { for _, schema := range schemas { t.Run(fmt.Sprintf("%s / %s", tc.query, schema.name), func(t *testing.T) { t.Log("========= Running query", tc.query, "with schema", schema.name) - store := newTestChunkStore(t, StoreConfig{ - schemaFactory: schema.fn, - QueryChunkLimit: 2e6, - }) + store := newTestChunkStore(t, schema.fn) if err := store.Put(ctx, []Chunk{ fooChunk1, @@ -272,20 +277,6 @@ func TestChunkStore_getMetricNameChunks(t *testing.T) { "toms": "code", }) - schemas := []struct { - name string - fn func(cfg SchemaConfig) Schema - }{ - {"v1 schema", v1Schema}, - {"v2 schema", v2Schema}, - {"v3 schema", v3Schema}, - {"v4 schema", v4Schema}, - {"v5 schema", v5Schema}, - {"v6 schema", v6Schema}, - {"v7 schema", v7Schema}, - {"v8 schema", v8Schema}, - } - for _, tc := range []struct { query string expect []Chunk @@ -339,10 +330,7 @@ func TestChunkStore_getMetricNameChunks(t *testing.T) { for _, schema := range schemas { t.Run(fmt.Sprintf("%s / %s", tc.query, schema.name), func(t *testing.T) { t.Log("========= Running query", tc.query, "with schema", schema.name) - store := newTestChunkStore(t, StoreConfig{ - schemaFactory: schema.fn, - QueryChunkLimit: 2e6, - }) + store := newTestChunkStore(t, schema.fn) if err := store.Put(ctx, []Chunk{chunk1, chunk2}); err != nil { t.Fatal(err) @@ -369,96 +357,72 @@ func mustNewLabelMatcher(matchType labels.MatchType, name string, value string) func TestChunkStoreRandom(t *testing.T) { ctx := user.InjectOrgID(context.Background(), userID) - schemas := []struct { - name string - fn func(cfg SchemaConfig) Schema - store *Store - }{ - {name: "v1 schema", fn: v1Schema}, - {name: "v2 schema", fn: v2Schema}, - {name: "v3 schema", fn: v3Schema}, - {name: "v4 schema", fn: v4Schema}, - {name: "v5 schema", fn: v5Schema}, - {name: "v6 schema", fn: v6Schema}, - {name: "v7 schema", fn: v7Schema}, - {name: "v8 schema", fn: v8Schema}, - } - for i := range schemas { - schemas[i].store = newTestChunkStore(t, StoreConfig{ - schemaFactory: schemas[i].fn, - QueryChunkLimit: 2e6, - }) - } + for _, schema := range schemas { + t.Run(schema.name, func(t *testing.T) { + store := newTestChunkStore(t, schema.fn) + + // put 100 chunks from 0 to 99 + const chunkLen = 13 * 3600 // in seconds + for i := 0; i < 100; i++ { + ts := model.TimeFromUnix(int64(i * chunkLen)) + chunks, _ := chunk.New().Add(model.SamplePair{ + Timestamp: ts, + Value: model.SampleValue(float64(i)), + }) + chunk := NewChunk( + userID, + model.Fingerprint(1), + model.Metric{ + model.MetricNameLabel: "foo", + "bar": "baz", + }, + chunks[0], + ts, + ts.Add(chunkLen*time.Second), + ) + + err := store.Put(ctx, []Chunk{chunk}) + require.NoError(t, err) + } - // put 100 chunks from 0 to 99 - const chunkLen = 13 * 3600 // in seconds - for i := 0; i < 100; i++ { - ts := model.TimeFromUnix(int64(i * chunkLen)) - chunks, _ := chunk.New().Add(model.SamplePair{ - Timestamp: ts, - Value: model.SampleValue(float64(i)), - }) - chunk := NewChunk( - userID, - model.Fingerprint(1), - model.Metric{ - model.MetricNameLabel: "foo", - "bar": "baz", - }, - chunks[0], - ts, - ts.Add(chunkLen*time.Second), - ) - for _, s := range schemas { - err := s.store.Put(ctx, []Chunk{chunk}) - require.NoError(t, err) - } - } + // pick two random numbers and do a query + for i := 0; i < 100; i++ { + start := rand.Int63n(100 * chunkLen) + end := start + rand.Int63n((100*chunkLen)-start) + assert.True(t, start < end) - // pick two random numbers and do a query - for i := 0; i < 100; i++ { - start := rand.Int63n(100 * chunkLen) - end := start + rand.Int63n((100*chunkLen)-start) - assert.True(t, start < end) + startTime := model.TimeFromUnix(start) + endTime := model.TimeFromUnix(end) - startTime := model.TimeFromUnix(start) - endTime := model.TimeFromUnix(end) + metricNameLabel := mustNewLabelMatcher(labels.MatchEqual, model.MetricNameLabel, "foo") + matchers := []*labels.Matcher{mustNewLabelMatcher(labels.MatchEqual, "bar", "baz")} + chunks, err := store.getMetricNameChunks(ctx, startTime, endTime, + matchers, metricNameLabel.Value) + require.NoError(t, err) - metricNameLabel := mustNewLabelMatcher(labels.MatchEqual, model.MetricNameLabel, "foo") - matchers := []*labels.Matcher{mustNewLabelMatcher(labels.MatchEqual, "bar", "baz")} + // We need to check that each chunk is in the time range + for _, chunk := range chunks { + assert.False(t, chunk.From.After(endTime)) + assert.False(t, chunk.Through.Before(startTime)) + samples, err := chunk.Samples(chunk.From, chunk.Through) + assert.NoError(t, err) + assert.Equal(t, 1, len(samples)) + // TODO verify chunk contents + } - for _, s := range schemas { - chunks, err := s.store.getMetricNameChunks(ctx, startTime, endTime, - matchers, - metricNameLabel.Value, - ) - require.NoError(t, err) - - // We need to check that each chunk is in the time range - for _, chunk := range chunks { - assert.False(t, chunk.From.After(endTime)) - assert.False(t, chunk.Through.Before(startTime)) - samples, err := chunk.Samples(chunk.From, chunk.Through) - assert.NoError(t, err) - assert.Equal(t, 1, len(samples)) - // TODO verify chunk contents + // And check we got all the chunks we want + numChunks := (end / chunkLen) - (start / chunkLen) + 1 + assert.Equal(t, int(numChunks), len(chunks)) } - - // And check we got all the chunks we want - numChunks := (end / chunkLen) - (start / chunkLen) + 1 - assert.Equal(t, int(numChunks), len(chunks), s.name) - } + }) } } func TestChunkStoreLeastRead(t *testing.T) { // Test we don't read too much from the index ctx := user.InjectOrgID(context.Background(), userID) - store := newTestChunkStore(t, StoreConfig{ - schemaFactory: v6Schema, - QueryChunkLimit: 2e6, - }) + store := newTestChunkStore(t, v6Schema) // Put 24 chunks 1hr chunks in the store const chunkLen = 60 // in seconds diff --git a/pkg/chunk/composite_store.go b/pkg/chunk/composite_store.go new file mode 100644 index 00000000000..17428da6c3a --- /dev/null +++ b/pkg/chunk/composite_store.go @@ -0,0 +1,194 @@ +package chunk + +import ( + "context" + "fmt" + "sort" + + "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/pkg/labels" +) + +// Store for chunks. +type Store interface { + Put(ctx context.Context, chunks []Chunk) error + Get(tx context.Context, from, through model.Time, matchers ...*labels.Matcher) ([]Chunk, error) + Stop() +} + +// compositeStore is a Store which delegates to various stores depending +// on when they were activated. +type compositeStore struct { + stores []compositeStoreEntry +} + +type compositeStoreEntry struct { + start model.Time + Store +} + +type byStart []compositeStoreEntry + +func (a byStart) Len() int { return len(a) } +func (a byStart) Swap(i, j int) { a[i], a[j] = a[j], a[i] } +func (a byStart) Less(i, j int) bool { return a[i].start < a[j].start } + +// NewCompositeStore creates a new Store which delegates to different stores depending +// on time. +func NewCompositeStore(cfg StoreConfig, schemaCfg SchemaConfig, storage StorageClient) (Store, error) { + store, err := newStore(cfg, v1Schema(schemaCfg), storage) + if err != nil { + return nil, err + } + + stores := []compositeStoreEntry{ + {0, store}, + } + + if schemaCfg.DailyBucketsFrom.IsSet() { + store, err := newStore(cfg, v2Schema(schemaCfg), storage) + if err != nil { + return nil, err + } + stores = append(stores, compositeStoreEntry{schemaCfg.DailyBucketsFrom.Time, store}) + } + + if schemaCfg.Base64ValuesFrom.IsSet() { + store, err := newStore(cfg, v3Schema(schemaCfg), storage) + if err != nil { + return nil, err + } + stores = append(stores, compositeStoreEntry{schemaCfg.Base64ValuesFrom.Time, store}) + } + + if schemaCfg.V4SchemaFrom.IsSet() { + store, err := newStore(cfg, v4Schema(schemaCfg), storage) + if err != nil { + return nil, err + } + stores = append(stores, compositeStoreEntry{schemaCfg.V4SchemaFrom.Time, store}) + } + + if schemaCfg.V5SchemaFrom.IsSet() { + store, err := newStore(cfg, v5Schema(schemaCfg), storage) + if err != nil { + return nil, err + } + stores = append(stores, compositeStoreEntry{schemaCfg.V5SchemaFrom.Time, store}) + } + + if schemaCfg.V6SchemaFrom.IsSet() { + store, err := newStore(cfg, v6Schema(schemaCfg), storage) + if err != nil { + return nil, err + } + stores = append(stores, compositeStoreEntry{schemaCfg.V6SchemaFrom.Time, store}) + } + + if schemaCfg.V7SchemaFrom.IsSet() { + store, err := newStore(cfg, v7Schema(schemaCfg), storage) + if err != nil { + return nil, err + } + stores = append(stores, compositeStoreEntry{schemaCfg.V7SchemaFrom.Time, store}) + } + + if schemaCfg.V8SchemaFrom.IsSet() { + store, err := newStore(cfg, v8Schema(schemaCfg), storage) + if err != nil { + return nil, err + } + stores = append(stores, compositeStoreEntry{schemaCfg.V8SchemaFrom.Time, store}) + } + + if !sort.IsSorted(byStart(stores)) { + return nil, fmt.Errorf("schemas not in time-sorted order") + } + + return compositeStore{stores}, nil +} + +func (c compositeStore) Put(ctx context.Context, chunks []Chunk) error { + for _, chunk := range chunks { + err := c.forStores(chunk.From, chunk.From, func(_, _ model.Time, store Store) error { + return store.Put(ctx, []Chunk{chunk}) + }) + if err != nil { + return err + } + } + return nil +} + +func (c compositeStore) Get(ctx context.Context, from, through model.Time, matchers ...*labels.Matcher) ([]Chunk, error) { + var results []Chunk + err := c.forStores(from, through, func(from, through model.Time, store Store) error { + chunks, err := store.Get(ctx, from, through, matchers...) + if err != nil { + return err + } + results = append(results, chunks...) + return nil + }) + return results, err +} + +func (c compositeStore) Stop() { + for _, store := range c.stores { + store.Stop() + } +} + +func (c compositeStore) forStores(from, through model.Time, callback func(from, through model.Time, store Store) error) error { + if len(c.stores) == 0 { + return nil + } + + // first, find the schema with the highest start _before or at_ from + i := sort.Search(len(c.stores), func(i int) bool { + return c.stores[i].start > from + }) + if i > 0 { + i-- + } else { + // This could happen if we get passed a sample from before 1970. + i = 0 + from = c.stores[0].start + } + + // next, find the schema with the lowest start _after_ through + j := sort.Search(len(c.stores), func(j int) bool { + return c.stores[j].start > through + }) + + min := func(a, b model.Time) model.Time { + if a < b { + return a + } + return b + } + + start := from + for ; i < j; i++ { + nextSchemaStarts := model.Latest + if i+1 < len(c.stores) { + nextSchemaStarts = c.stores[i+1].start + } + + // If the next schema starts at the same time as this one, + // skip this one. + if nextSchemaStarts == c.stores[i].start { + continue + } + + end := min(through, nextSchemaStarts-1) + err := callback(start, end, c.stores[i].Store) + if err != nil { + return err + } + + start = nextSchemaStarts + } + + return nil +} diff --git a/pkg/chunk/composite_store_test.go b/pkg/chunk/composite_store_test.go new file mode 100644 index 00000000000..c315aef0d27 --- /dev/null +++ b/pkg/chunk/composite_store_test.go @@ -0,0 +1,167 @@ +package chunk + +import ( + "context" + "fmt" + "reflect" + "testing" + + "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/pkg/labels" + "github.com/weaveworks/common/test" +) + +type mockStore int + +func (m mockStore) Put(ctx context.Context, chunks []Chunk) error { + return nil +} + +func (m mockStore) Get(tx context.Context, from, through model.Time, matchers ...*labels.Matcher) ([]Chunk, error) { + return nil, nil +} + +func (m mockStore) Stop() {} + +func TestCompositeStore(t *testing.T) { + type result struct { + from, through model.Time + store Store + } + collect := func(results *[]result) func(from, through model.Time, store Store) error { + return func(from, through model.Time, store Store) error { + *results = append(*results, result{from, through, store}) + return nil + } + } + cs := compositeStore{ + stores: []compositeStoreEntry{ + {model.TimeFromUnix(0), mockStore(1)}, + {model.TimeFromUnix(100), mockStore(2)}, + {model.TimeFromUnix(200), mockStore(3)}, + }, + } + + for i, tc := range []struct { + cs compositeStore + from, through int64 + want []result + }{ + // Test we have sensible results when there are no schema's defined + {compositeStore{}, 0, 1, []result{}}, + + // Test we have sensible results when there is a single schema + { + compositeStore{ + stores: []compositeStoreEntry{ + {model.TimeFromUnix(0), mockStore(1)}, + }, + }, + 0, 10, + []result{ + {model.TimeFromUnix(0), model.TimeFromUnix(10), mockStore(1)}, + }, + }, + + // Test we have sensible results for negative (ie pre 1970) times + { + compositeStore{ + stores: []compositeStoreEntry{ + {model.TimeFromUnix(0), mockStore(1)}, + }, + }, + -10, -9, + []result{}, + }, + { + compositeStore{ + stores: []compositeStoreEntry{ + {model.TimeFromUnix(0), mockStore(1)}, + }, + }, + -10, 10, + []result{ + {model.TimeFromUnix(0), model.TimeFromUnix(10), mockStore(1)}, + }, + }, + + // Test we have sensible results when there is two schemas + { + compositeStore{ + stores: []compositeStoreEntry{ + {model.TimeFromUnix(0), mockStore(1)}, + {model.TimeFromUnix(100), mockStore(2)}, + }, + }, + 34, 165, + []result{ + {model.TimeFromUnix(34), model.TimeFromUnix(100) - 1, mockStore(1)}, + {model.TimeFromUnix(100), model.TimeFromUnix(165), mockStore(2)}, + }, + }, + + // Test we get only one result when two schema start at same time + { + compositeStore{ + stores: []compositeStoreEntry{ + {model.TimeFromUnix(0), mockStore(1)}, + {model.TimeFromUnix(10), mockStore(2)}, + {model.TimeFromUnix(10), mockStore(3)}, + }, + }, + 0, 165, + []result{ + {model.TimeFromUnix(0), model.TimeFromUnix(10) - 1, mockStore(1)}, + {model.TimeFromUnix(10), model.TimeFromUnix(165), mockStore(3)}, + }, + }, + + // Test all the various combination we can get when there are three schemas + { + cs, 34, 65, + []result{ + {model.TimeFromUnix(34), model.TimeFromUnix(65), mockStore(1)}, + }, + }, + + { + cs, 244, 6785, + []result{ + {model.TimeFromUnix(244), model.TimeFromUnix(6785), mockStore(3)}, + }, + }, + + { + cs, 34, 165, + []result{ + {model.TimeFromUnix(34), model.TimeFromUnix(100) - 1, mockStore(1)}, + {model.TimeFromUnix(100), model.TimeFromUnix(165), mockStore(2)}, + }, + }, + + { + cs, 151, 264, + []result{ + {model.TimeFromUnix(151), model.TimeFromUnix(200) - 1, mockStore(2)}, + {model.TimeFromUnix(200), model.TimeFromUnix(264), mockStore(3)}, + }, + }, + + { + cs, 32, 264, + []result{ + {model.TimeFromUnix(32), model.TimeFromUnix(100) - 1, mockStore(1)}, + {model.TimeFromUnix(100), model.TimeFromUnix(200) - 1, mockStore(2)}, + {model.TimeFromUnix(200), model.TimeFromUnix(264), mockStore(3)}, + }, + }, + } { + t.Run(fmt.Sprintf("%d", i), func(t *testing.T) { + have := []result{} + tc.cs.forStores(model.TimeFromUnix(tc.from), model.TimeFromUnix(tc.through), collect(&have)) + if !reflect.DeepEqual(tc.want, have) { + t.Fatalf("wrong stores - %s", test.Diff(tc.want, have)) + } + }) + } +} diff --git a/pkg/chunk/schema_config.go b/pkg/chunk/schema_config.go index d9e65862bef..44d2b85306e 100644 --- a/pkg/chunk/schema_config.go +++ b/pkg/chunk/schema_config.go @@ -3,7 +3,6 @@ package chunk import ( "flag" "fmt" - "sort" "strconv" "time" @@ -282,202 +281,3 @@ func (cfg *PeriodicTableConfig) TableFor(t model.Time) string { ) return cfg.Prefix + strconv.Itoa(int(table)) } - -// compositeSchema is a Schema which delegates to various schemas depending -// on when they were activated. -type compositeSchema struct { - schemas []compositeSchemaEntry -} - -type compositeSchemaEntry struct { - start model.Time - Schema -} - -type byStart []compositeSchemaEntry - -func (a byStart) Len() int { return len(a) } -func (a byStart) Swap(i, j int) { a[i], a[j] = a[j], a[i] } -func (a byStart) Less(i, j int) bool { return a[i].start < a[j].start } - -func newCompositeSchema(cfg SchemaConfig) (Schema, error) { - schemas := []compositeSchemaEntry{ - {0, v1Schema(cfg)}, - } - - if cfg.DailyBucketsFrom.IsSet() { - schemas = append(schemas, compositeSchemaEntry{cfg.DailyBucketsFrom.Time, v2Schema(cfg)}) - } - - if cfg.Base64ValuesFrom.IsSet() { - schemas = append(schemas, compositeSchemaEntry{cfg.Base64ValuesFrom.Time, v3Schema(cfg)}) - } - - if cfg.V4SchemaFrom.IsSet() { - schemas = append(schemas, compositeSchemaEntry{cfg.V4SchemaFrom.Time, v4Schema(cfg)}) - } - - if cfg.V5SchemaFrom.IsSet() { - schemas = append(schemas, compositeSchemaEntry{cfg.V5SchemaFrom.Time, v5Schema(cfg)}) - } - - if cfg.V6SchemaFrom.IsSet() { - schemas = append(schemas, compositeSchemaEntry{cfg.V6SchemaFrom.Time, v6Schema(cfg)}) - } - - if cfg.V7SchemaFrom.IsSet() { - schemas = append(schemas, compositeSchemaEntry{cfg.V7SchemaFrom.Time, v7Schema(cfg)}) - } - - if cfg.V8SchemaFrom.IsSet() { - schemas = append(schemas, compositeSchemaEntry{cfg.V8SchemaFrom.Time, v8Schema(cfg)}) - } - - if !sort.IsSorted(byStart(schemas)) { - return nil, fmt.Errorf("schemas not in time-sorted order") - } - - return compositeSchema{schemas}, nil -} - -func (c compositeSchema) forSchemasIndexQuery(from, through model.Time, callback func(from, through model.Time, schema Schema) ([]IndexQuery, error)) ([]IndexQuery, error) { - if len(c.schemas) == 0 { - return nil, nil - } - - // first, find the schema with the highest start _before or at_ from - i := sort.Search(len(c.schemas), func(i int) bool { - return c.schemas[i].start > from - }) - if i > 0 { - i-- - } else { - // This could happen if we get passed a sample from before 1970. - i = 0 - from = c.schemas[0].start - } - - // next, find the schema with the lowest start _after_ through - j := sort.Search(len(c.schemas), func(j int) bool { - return c.schemas[j].start > through - }) - - min := func(a, b model.Time) model.Time { - if a < b { - return a - } - return b - } - - start := from - result := []IndexQuery{} - for ; i < j; i++ { - nextSchemaStarts := model.Latest - if i+1 < len(c.schemas) { - nextSchemaStarts = c.schemas[i+1].start - } - - // If the next schema starts at the same time as this one, - // skip this one. - if nextSchemaStarts == c.schemas[i].start { - continue - } - - end := min(through, nextSchemaStarts-1) - entries, err := callback(start, end, c.schemas[i].Schema) - if err != nil { - return nil, err - } - - result = append(result, entries...) - start = nextSchemaStarts - } - - return result, nil -} - -func (c compositeSchema) forSchemasIndexEntry(from, through model.Time, callback func(from, through model.Time, schema Schema) ([]IndexEntry, error)) ([]IndexEntry, error) { - if len(c.schemas) == 0 { - return nil, nil - } - - // first, find the schema with the highest start _before or at_ from - i := sort.Search(len(c.schemas), func(i int) bool { - return c.schemas[i].start > from - }) - if i > 0 { - i-- - } else { - // This could happen if we get passed a sample from before 1970. - i = 0 - from = c.schemas[0].start - } - - // next, find the schema with the lowest start _after_ through - j := sort.Search(len(c.schemas), func(j int) bool { - return c.schemas[j].start > through - }) - - min := func(a, b model.Time) model.Time { - if a < b { - return a - } - return b - } - - start := from - result := []IndexEntry{} - for ; i < j; i++ { - nextSchemaStarts := model.Latest - if i+1 < len(c.schemas) { - nextSchemaStarts = c.schemas[i+1].start - } - - // If the next schema starts at the same time as this one, - // skip this one. - if nextSchemaStarts == c.schemas[i].start { - continue - } - - end := min(through, nextSchemaStarts-1) - entries, err := callback(start, end, c.schemas[i].Schema) - if err != nil { - return nil, err - } - - result = append(result, entries...) - start = nextSchemaStarts - } - - return result, nil -} - -func (c compositeSchema) GetWriteEntries(from, through model.Time, userID string, metricName model.LabelValue, labels model.Metric, chunkID string) ([]IndexEntry, error) { - return c.forSchemasIndexEntry(from, through, func(from, through model.Time, schema Schema) ([]IndexEntry, error) { - return schema.GetWriteEntries(from, through, userID, metricName, labels, chunkID) - }) -} - -func (c compositeSchema) GetReadQueries(from, through model.Time, userID string) ([]IndexQuery, error) { - return c.forSchemasIndexQuery(from, through, func(from, through model.Time, schema Schema) ([]IndexQuery, error) { - return schema.GetReadQueries(from, through, userID) - }) -} - -func (c compositeSchema) GetReadQueriesForMetric(from, through model.Time, userID string, metricName model.LabelValue) ([]IndexQuery, error) { - return c.forSchemasIndexQuery(from, through, func(from, through model.Time, schema Schema) ([]IndexQuery, error) { - return schema.GetReadQueriesForMetric(from, through, userID, metricName) - }) -} - -func (c compositeSchema) GetReadQueriesForMetricLabel(from, through model.Time, userID string, metricName model.LabelValue, labelName model.LabelName) ([]IndexQuery, error) { - return c.forSchemasIndexQuery(from, through, func(from, through model.Time, schema Schema) ([]IndexQuery, error) { - return schema.GetReadQueriesForMetricLabel(from, through, userID, metricName, labelName) - }) -} - -func (c compositeSchema) GetReadQueriesForMetricLabelValue(from, through model.Time, userID string, metricName model.LabelValue, labelName model.LabelName, labelValue model.LabelValue) ([]IndexQuery, error) { - return c.forSchemasIndexQuery(from, through, func(from, through model.Time, schema Schema) ([]IndexQuery, error) { - return schema.GetReadQueriesForMetricLabelValue(from, through, userID, metricName, labelName, labelValue) - }) -} diff --git a/pkg/chunk/schema_config_test.go b/pkg/chunk/schema_config_test.go index 2bf6e1d1d41..3ff0455e2d3 100644 --- a/pkg/chunk/schema_config_test.go +++ b/pkg/chunk/schema_config_test.go @@ -1,12 +1,10 @@ package chunk import ( - "fmt" "reflect" "testing" "github.com/prometheus/common/model" - "github.com/weaveworks/common/test" ) func TestHourlyBuckets(t *testing.T) { @@ -180,146 +178,3 @@ func TestDailyBuckets(t *testing.T) { }) } } - -func TestCompositeSchema(t *testing.T) { - type result struct { - from, through model.Time - schema Schema - } - collect := func(results *[]result) func(from, through model.Time, schema Schema) ([]IndexEntry, error) { - return func(from, through model.Time, schema Schema) ([]IndexEntry, error) { - *results = append(*results, result{from, through, schema}) - return nil, nil - } - } - cs := compositeSchema{ - schemas: []compositeSchemaEntry{ - {model.TimeFromUnix(0), mockSchema(1)}, - {model.TimeFromUnix(100), mockSchema(2)}, - {model.TimeFromUnix(200), mockSchema(3)}, - }, - } - - for i, tc := range []struct { - cs compositeSchema - from, through int64 - want []result - }{ - // Test we have sensible results when there are no schema's defined - {compositeSchema{}, 0, 1, []result{}}, - - // Test we have sensible results when there is a single schema - { - compositeSchema{ - schemas: []compositeSchemaEntry{ - {model.TimeFromUnix(0), mockSchema(1)}, - }, - }, - 0, 10, - []result{ - {model.TimeFromUnix(0), model.TimeFromUnix(10), mockSchema(1)}, - }, - }, - - // Test we have sensible results for negative (ie pre 1970) times - { - compositeSchema{ - schemas: []compositeSchemaEntry{ - {model.TimeFromUnix(0), mockSchema(1)}, - }, - }, - -10, -9, - []result{}, - }, - { - compositeSchema{ - schemas: []compositeSchemaEntry{ - {model.TimeFromUnix(0), mockSchema(1)}, - }, - }, - -10, 10, - []result{ - {model.TimeFromUnix(0), model.TimeFromUnix(10), mockSchema(1)}, - }, - }, - - // Test we have sensible results when there is two schemas - { - compositeSchema{ - schemas: []compositeSchemaEntry{ - {model.TimeFromUnix(0), mockSchema(1)}, - {model.TimeFromUnix(100), mockSchema(2)}, - }, - }, - 34, 165, - []result{ - {model.TimeFromUnix(34), model.TimeFromUnix(100) - 1, mockSchema(1)}, - {model.TimeFromUnix(100), model.TimeFromUnix(165), mockSchema(2)}, - }, - }, - - // Test we get only one result when two schema start at same time - { - compositeSchema{ - schemas: []compositeSchemaEntry{ - {model.TimeFromUnix(0), mockSchema(1)}, - {model.TimeFromUnix(10), mockSchema(2)}, - {model.TimeFromUnix(10), mockSchema(3)}, - }, - }, - 0, 165, - []result{ - {model.TimeFromUnix(0), model.TimeFromUnix(10) - 1, mockSchema(1)}, - {model.TimeFromUnix(10), model.TimeFromUnix(165), mockSchema(3)}, - }, - }, - - // Test all the various combination we can get when there are three schemas - { - cs, 34, 65, - []result{ - {model.TimeFromUnix(34), model.TimeFromUnix(65), mockSchema(1)}, - }, - }, - - { - cs, 244, 6785, - []result{ - {model.TimeFromUnix(244), model.TimeFromUnix(6785), mockSchema(3)}, - }, - }, - - { - cs, 34, 165, - []result{ - {model.TimeFromUnix(34), model.TimeFromUnix(100) - 1, mockSchema(1)}, - {model.TimeFromUnix(100), model.TimeFromUnix(165), mockSchema(2)}, - }, - }, - - { - cs, 151, 264, - []result{ - {model.TimeFromUnix(151), model.TimeFromUnix(200) - 1, mockSchema(2)}, - {model.TimeFromUnix(200), model.TimeFromUnix(264), mockSchema(3)}, - }, - }, - - { - cs, 32, 264, - []result{ - {model.TimeFromUnix(32), model.TimeFromUnix(100) - 1, mockSchema(1)}, - {model.TimeFromUnix(100), model.TimeFromUnix(200) - 1, mockSchema(2)}, - {model.TimeFromUnix(200), model.TimeFromUnix(264), mockSchema(3)}, - }, - }, - } { - t.Run(fmt.Sprintf("TestSchemaComposite[%d]", i), func(t *testing.T) { - have := []result{} - tc.cs.forSchemasIndexEntry(model.TimeFromUnix(tc.from), model.TimeFromUnix(tc.through), collect(&have)) - if !reflect.DeepEqual(tc.want, have) { - t.Fatalf("wrong schemas - %s", test.Diff(tc.want, have)) - } - }) - } -} diff --git a/pkg/chunk/schema_test.go b/pkg/chunk/schema_test.go index 70403f6d68c..73a7d7e2cb7 100644 --- a/pkg/chunk/schema_test.go +++ b/pkg/chunk/schema_test.go @@ -17,24 +17,6 @@ import ( "github.com/weaveworks/cortex/pkg/util" ) -type mockSchema int - -func (mockSchema) GetWriteEntries(from, through model.Time, userID string, metricName model.LabelValue, labels model.Metric, chunkID string) ([]IndexEntry, error) { - return nil, nil -} -func (mockSchema) GetReadQueries(from, through model.Time, userID string) ([]IndexQuery, error) { - return nil, nil -} -func (mockSchema) GetReadQueriesForMetric(from, through model.Time, userID string, metricName model.LabelValue) ([]IndexQuery, error) { - return nil, nil -} -func (mockSchema) GetReadQueriesForMetricLabel(from, through model.Time, userID string, metricName model.LabelValue, labelName model.LabelName) ([]IndexQuery, error) { - return nil, nil -} -func (mockSchema) GetReadQueriesForMetricLabelValue(from, through model.Time, userID string, metricName model.LabelValue, labelName model.LabelName, labelValue model.LabelValue) ([]IndexQuery, error) { - return nil, nil -} - type ByHashRangeKey []IndexEntry func (a ByHashRangeKey) Len() int { return len(a) } @@ -81,15 +63,6 @@ func TestSchemaHashKeys(t *testing.T) { From: util.NewDayValue(model.TimeFromUnix(5 * 24 * 60 * 60)), }, } - compositeSchema := func(dailyBucketsFrom model.Time) Schema { - cfgCp := cfg - cfgCp.DailyBucketsFrom = util.NewDayValue(dailyBucketsFrom) - schema, err := newCompositeSchema(cfgCp) - if err != nil { - t.Fatal(err) - } - return schema - } hourlyBuckets := v1Schema(cfg) dailyBuckets := v3Schema(cfg) labelBuckets := v4Schema(cfg) @@ -134,83 +107,6 @@ func TestSchemaHashKeys(t *testing.T) { mkResult(table, "userid:d%d:foo:bar", 0, 3), ), }, - - // Buckets are by hour until we reach the `dailyBucketsFrom`, after which they are by day. - { - compositeSchema(model.TimeFromUnix(0).Add(1 * 24 * time.Hour)), - 0, (3 * 24 * 60 * 60) - 1, "foo", - mergeResults( - mkResult(table, "userid:%d:foo", 0, 1*24), - mkResult(table, "userid:d%d:foo", 1, 3), - ), - }, - - // Only the day part of `dailyBucketsFrom` matters, not the time part. - { - compositeSchema(model.TimeFromUnix(0).Add(2*24*time.Hour) - 1), - 0, (3 * 24 * 60 * 60) - 1, "foo", - mergeResults( - mkResult(table, "userid:%d:foo", 0, 1*24), - mkResult(table, "userid:d%d:foo", 1, 3), - ), - }, - - // Moving dailyBucketsFrom to the previous day compared to the above makes 24 1-hour buckets disappear. - { - compositeSchema(model.TimeFromUnix(0).Add(1*24*time.Hour) - 1), - 0, (3 * 24 * 60 * 60) - 1, "foo", - mkResult(table, "userid:d%d:foo", 0, 3), - }, - - // If `dailyBucketsFrom` is after the interval, everything will be bucketed by hour. - { - compositeSchema(model.TimeFromUnix(0).Add(99 * 24 * time.Hour)), - 0, (2 * 24 * 60 * 60) - 1, "foo", - mkResult(table, "userid:%d:foo", 0, 2*24), - }, - - // Should only return daily buckets when dailyBucketsFrom is before the interval. - { - compositeSchema(model.TimeFromUnix(0)), - 1 * 24 * 60 * 60, (3 * 24 * 60 * 60) - 1, "foo", - mkResult(table, "userid:d%d:foo", 1, 3), - }, - - // Basic weekly- ables. - { - compositeSchema(model.TimeFromUnix(0)), - 5 * 24 * 60 * 60, (10 * 24 * 60 * 60) - 1, "foo", - mergeResults( - mkResult(periodicPrefix+"2", "userid:d%d:foo", 5, 6), - mkResult(periodicPrefix+"3", "userid:d%d:foo", 6, 8), - mkResult(periodicPrefix+"4", "userid:d%d:foo", 8, 10), - ), - }, - - // Daily buckets + weekly tables. - { - compositeSchema(model.TimeFromUnix(0)), - 0, (10 * 24 * 60 * 60) - 1, "foo", - mergeResults( - mkResult(table, "userid:d%d:foo", 0, 5), - mkResult(periodicPrefix+"2", "userid:d%d:foo", 5, 6), - mkResult(periodicPrefix+"3", "userid:d%d:foo", 6, 8), - mkResult(periodicPrefix+"4", "userid:d%d:foo", 8, 10), - ), - }, - - // Houly Buckets, then daily buckets, then weekly tables. - { - compositeSchema(model.TimeFromUnix(2 * 24 * 60 * 60)), - 0, (10 * 24 * 60 * 60) - 1, "foo", - mergeResults( - mkResult(table, "userid:%d:foo", 0, 2*24), - mkResult(table, "userid:d%d:foo", 2, 5), - mkResult(periodicPrefix+"2", "userid:d%d:foo", 5, 6), - mkResult(periodicPrefix+"3", "userid:d%d:foo", 6, 8), - mkResult(periodicPrefix+"4", "userid:d%d:foo", 8, 10), - ), - }, } { t.Run(fmt.Sprintf("TestSchemaHashKeys[%d]", i), func(t *testing.T) { have, err := tc.Schema.GetWriteEntries( From 7919f863874219e8a4ab15ca4d4788c1bc0e19c7 Mon Sep 17 00:00:00 2001 From: Tom Wilkie Date: Wed, 11 Jul 2018 21:06:00 +0100 Subject: [PATCH 2/6] Schema v9: Index series, not chunks AND seriesStore, a chunk store for this schema. I tried to adapt the original chunk store to support this style of indexing - easy on the right path, but the read path became even more of a rats nest. So I factored out the common bits as best I could and made a new chunk store - the seriesStore. Signed-off-by: Tom Wilkie --- pkg/chunk/chunk_store.go | 198 +++++++------------------ pkg/chunk/chunk_store_test.go | 81 +++++----- pkg/chunk/chunk_store_utils.go | 168 +++++++++++++++++++++ pkg/chunk/composite_store.go | 8 + pkg/chunk/inmemory_storage_client.go | 18 ++- pkg/chunk/schema.go | 133 ++++++++++++++++- pkg/chunk/schema_config.go | 2 + pkg/chunk/schema_test.go | 2 +- pkg/chunk/schema_util.go | 54 +++++-- pkg/chunk/schema_util_test.go | 2 +- pkg/chunk/series_store.go | 211 +++++++++++++++++++++++++++ 11 files changed, 664 insertions(+), 213 deletions(-) create mode 100644 pkg/chunk/chunk_store_utils.go create mode 100644 pkg/chunk/series_store.go diff --git a/pkg/chunk/chunk_store.go b/pkg/chunk/chunk_store.go index ef850fe2966..db31bd1fb56 100644 --- a/pkg/chunk/chunk_store.go +++ b/pkg/chunk/chunk_store.go @@ -7,10 +7,7 @@ import ( "sort" "time" - "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" - ot "github.com/opentracing/opentracing-go" - otlog "github.com/opentracing/opentracing-go/log" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/model" "github.com/prometheus/prometheus/pkg/labels" @@ -71,21 +68,21 @@ type store struct { cfg StoreConfig storage StorageClient - cache cache.Cache schema Schema + *chunkFetcher } -func newStore(cfg StoreConfig, schema Schema, storage StorageClient) (*store, error) { - cache, err := cache.New(cfg.CacheConfig) +func newStore(cfg StoreConfig, schema Schema, storage StorageClient) (Store, error) { + fetcher, err := newChunkFetcher(cfg.CacheConfig, storage) if err != nil { return nil, err } return &store{ - cfg: cfg, - storage: storage, - schema: schema, - cache: cache, + cfg: cfg, + storage: storage, + schema: schema, + chunkFetcher: fetcher, }, nil } @@ -111,7 +108,7 @@ func (c *store) Put(ctx context.Context, chunks []Chunk) error { } func (c *store) updateIndex(ctx context.Context, userID string, chunks []Chunk) error { - writeReqs, err := c.calculateDynamoWrites(userID, chunks) + writeReqs, err := c.calculateIndexEntries(userID, chunks) if err != nil { return err } @@ -119,9 +116,8 @@ func (c *store) updateIndex(ctx context.Context, userID string, chunks []Chunk) return c.storage.BatchWrite(ctx, writeReqs) } -// calculateDynamoWrites creates a set of batched WriteRequests to dynamo for all -// the chunks it is given. -func (c *store) calculateDynamoWrites(userID string, chunks []Chunk) (WriteBatch, error) { +// calculateIndexEntries creates a set of batched WriteRequests for all the chunks it is given. +func (c *store) calculateIndexEntries(userID string, chunks []Chunk) (WriteBatch, error) { seenIndexEntries := map[string]struct{}{} writeReqs := c.storage.NewWriteBatch() @@ -138,85 +134,74 @@ func (c *store) calculateDynamoWrites(userID string, chunks []Chunk) (WriteBatch indexEntriesPerChunk.Observe(float64(len(entries))) // Remove duplicate entries based on tableName:hashValue:rangeValue - unseenEntries := []IndexEntry{} for _, entry := range entries { key := fmt.Sprintf("%s:%s:%x", entry.TableName, entry.HashValue, entry.RangeValue) if _, ok := seenIndexEntries[key]; !ok { seenIndexEntries[key] = struct{}{} - unseenEntries = append(unseenEntries, entry) + rowWrites.Observe(entry.HashValue, 1) + writeReqs.Add(entry.TableName, entry.HashValue, entry.RangeValue, entry.Value) } } - - for _, entry := range unseenEntries { - rowWrites.Observe(entry.HashValue, 1) - writeReqs.Add(entry.TableName, entry.HashValue, entry.RangeValue, entry.Value) - } } return writeReqs, nil } -// spanLogger unifies tracing and logging, to reduce repetition. -type spanLogger struct { - log.Logger - ot.Span -} - -func newSpanLogger(ctx context.Context, method string) (*spanLogger, context.Context) { - span, ctx := ot.StartSpanFromContext(ctx, "ChunkStore.Get") - return &spanLogger{ - Logger: log.With(util.WithContext(ctx, util.Logger), "method", method), - Span: span, - }, ctx -} +// Get implements Store +func (c *store) Get(ctx context.Context, from, through model.Time, allMatchers ...*labels.Matcher) ([]Chunk, error) { + log, ctx := newSpanLogger(ctx, "ChunkStore.Get") + defer log.Span.Finish() + level.Debug(log).Log("from", from, "through", through, "matchers", len(allMatchers)) -func (s *spanLogger) Log(kvps ...interface{}) error { - s.Logger.Log(kvps...) - fields, err := otlog.InterleavedKVToFields(kvps...) + // Validate the query is within reasonable bounds. + shortcut, err := c.validateQuery(ctx, from, &through) if err != nil { - return err + return nil, err + } else if shortcut { + return nil, nil } - s.Span.LogFields(fields...) - return nil + + // Fetch metric name chunks if the matcher is of type equal, + metricNameMatcher, matchers, ok := extract.MetricNameMatcherFromMatchers(allMatchers) + if ok && metricNameMatcher.Type == labels.MatchEqual { + log.Span.SetTag("metric", metricNameMatcher.Value) + return c.getMetricNameChunks(ctx, from, through, matchers, metricNameMatcher.Value) + } + + // Otherwise we consult the metric name index first and then create queries for each matching metric name. + return c.getSeriesChunks(ctx, from, through, matchers, metricNameMatcher) } -// Get implements ChunkStore -func (c *store) Get(ctx context.Context, from, through model.Time, allMatchers ...*labels.Matcher) ([]Chunk, error) { - log, ctx := newSpanLogger(ctx, "ChunkStore.Get") +func (c *store) validateQuery(ctx context.Context, from model.Time, through *model.Time) (shortcut bool, err error) { + log, ctx := newSpanLogger(ctx, "store.validateQuery") defer log.Span.Finish() now := model.Now() - level.Debug(log).Log("from", from, "through", through, "now", now, "matchers", len(allMatchers)) - if through < from { - return nil, fmt.Errorf("invalid query, through < from (%d < %d)", through, from) + if *through < from { + err = fmt.Errorf("invalid query, through < from (%d < %d)", through, from) + return } if from.After(now) { // time-span start is in future ... regard as legal level.Error(log).Log("msg", "whole timerange in future, yield empty resultset", "through", through, "from", from, "now", now) - return nil, nil + shortcut = true + return } if from.After(now.Add(-c.cfg.MinChunkAge)) { // no data relevant to this query will have arrived at the store yet - return nil, nil + shortcut = true + return } if through.After(now.Add(5 * time.Minute)) { // time-span end is in future ... regard as legal level.Error(log).Log("msg", "adjusting end timerange from future to now", "old_through", through, "new_through", now) - through = now // Avoid processing future part - otherwise some schemas could fail with eg non-existent table gripes - } - - // Fetch metric name chunks if the matcher is of type equal, - metricNameMatcher, matchers, ok := extract.MetricNameMatcherFromMatchers(allMatchers) - if ok && metricNameMatcher.Type == labels.MatchEqual { - log.Span.SetTag("metric", metricNameMatcher.Value) - return c.getMetricNameChunks(ctx, from, through, matchers, metricNameMatcher.Value) + *through = now // Avoid processing future part - otherwise some schemas could fail with eg non-existent table gripes } - // Otherwise we consult the metric name index first and then create queries for each matching metric name. - return c.getSeriesChunks(ctx, from, through, matchers, metricNameMatcher) + return } func (c *store) getMetricNameChunks(ctx context.Context, from, through model.Time, allMatchers []*labels.Matcher, metricName string) ([]Chunk, error) { @@ -232,15 +217,7 @@ func (c *store) getMetricNameChunks(ctx context.Context, from, through model.Tim level.Debug(log).Log("Chunks in index", len(chunks)) // Filter out chunks that are not in the selected time range. - filtered := make([]Chunk, 0, len(chunks)) - keys := make([]string, 0, len(chunks)) - for _, chunk := range chunks { - if chunk.Through < from || through < chunk.From { - continue - } - filtered = append(filtered, chunk) - keys = append(keys, chunk.ExternalKey()) - } + filtered, keys := filterChunksByTime(from, through, chunks) level.Debug(log).Log("Chunks post filtering", len(chunks)) if len(filtered) > c.cfg.QueryChunkLimit { @@ -250,79 +227,16 @@ func (c *store) getMetricNameChunks(ctx context.Context, from, through model.Tim } // Now fetch the actual chunk data from Memcache / S3 - cacheHits, cacheBufs, _, err := c.cache.FetchChunkData(ctx, keys) - if err != nil { - level.Warn(log).Log("msg", "error fetching from cache", "err", err) - } - - fromCache, missing, err := ProcessCacheResponse(filtered, cacheHits, cacheBufs) - if err != nil { - level.Warn(log).Log("msg", "error fetching from cache", "err", err) - } - - fromStorage, err := c.storage.GetChunks(ctx, missing) - - // Always cache any chunks we did get - if cacheErr := c.writeBackCache(ctx, fromStorage); cacheErr != nil { - level.Warn(log).Log("msg", "could not store chunks in chunk cache", "err", cacheErr) - } - + allChunks, err := c.fetchChunks(ctx, filtered, keys) if err != nil { return nil, promql.ErrStorage(err) } - allChunks := append(fromCache, fromStorage...) - - // Filter out chunks - filteredChunks := make([]Chunk, 0, len(allChunks)) -outer: - for _, chunk := range allChunks { - for _, filter := range filters { - if !filter.Matches(string(chunk.Metric[model.LabelName(filter.Name)])) { - continue outer - } - } - filteredChunks = append(filteredChunks, chunk) - } - + // Filter out chunks based on the empty matchers in the query. + filteredChunks := filterChunksByMatchers(allChunks, filters) return filteredChunks, nil } -// ProcessCacheResponse decodes the chunks coming back from the cache, separating -// hits and misses. -func ProcessCacheResponse(chunks []Chunk, keys []string, bufs [][]byte) (found []Chunk, missing []Chunk, err error) { - decodeContext := NewDecodeContext() - - i, j := 0, 0 - for i < len(chunks) && j < len(keys) { - chunkKey := chunks[i].ExternalKey() - - if chunkKey < keys[j] { - missing = append(missing, chunks[i]) - i++ - } else if chunkKey > keys[j] { - level.Debug(util.Logger).Log("msg", "got chunk from cache we didn't ask for") - j++ - } else { - chunk := chunks[i] - err = chunk.Decode(decodeContext, bufs[j]) - if err != nil { - cacheCorrupt.Inc() - return - } - found = append(found, chunk) - i++ - j++ - } - } - - for ; i < len(chunks); i++ { - missing = append(missing, chunks[i]) - } - - return -} - func (c *store) getSeriesChunks(ctx context.Context, from, through model.Time, allMatchers []*labels.Matcher, metricNameMatcher *labels.Matcher) ([]Chunk, error) { // Get all series from the index userID, err := user.ExtractOrgID(ctx) @@ -475,8 +389,7 @@ func (c *store) lookupChunksByMetricName(ctx context.Context, from, through mode if lastErr != nil { return nil, lastErr } - - level.Debug(log).Log("msg", "post intersection", "entries", len(chunkIDs)) + level.Debug(log).Log("msg", "post intersection", "chunkIDs", len(chunkIDs)) // Convert IndexEntry's into chunks return c.convertChunkIDsToChunks(ctx, chunkIDs) @@ -536,7 +449,7 @@ func (c *store) parseIndexEntries(ctx context.Context, entries []IndexEntry, mat result := make([]string, 0, len(entries)) for _, entry := range entries { - chunkKey, labelValue, _, err := parseChunkTimeRangeValue(entry.RangeValue, entry.Value) + chunkKey, labelValue, _, _, err := parseChunkTimeRangeValue(entry.RangeValue, entry.Value) if err != nil { return nil, err } @@ -571,16 +484,3 @@ func (c *store) convertChunkIDsToChunks(ctx context.Context, chunkIDs []string) return chunkSet, nil } - -func (c *store) writeBackCache(ctx context.Context, chunks []Chunk) error { - for i := range chunks { - encoded, err := chunks[i].Encode() - if err != nil { - return err - } - if err := c.cache.StoreChunk(ctx, chunks[i].ExternalKey(), encoded); err != nil { - return err - } - } - return nil -} diff --git a/pkg/chunk/chunk_store_test.go b/pkg/chunk/chunk_store_test.go index 716c5cfa538..dc58ae412dd 100644 --- a/pkg/chunk/chunk_store_test.go +++ b/pkg/chunk/chunk_store_test.go @@ -22,23 +22,28 @@ import ( "github.com/weaveworks/common/user" ) +type schemaFactory func(cfg SchemaConfig) Schema +type storeFactory func(StoreConfig, Schema, StorageClient) (Store, error) + var schemas = []struct { name string - fn func(cfg SchemaConfig) Schema + schemaFn schemaFactory + storeFn storeFactory requireMetricName bool }{ - {"v1 schema", v1Schema, true}, - {"v2 schema", v2Schema, true}, - {"v3 schema", v3Schema, true}, - {"v4 schema", v4Schema, true}, - {"v5 schema", v5Schema, true}, - {"v6 schema", v6Schema, true}, - {"v7 schema", v7Schema, true}, - {"v8 schema", v8Schema, false}, + {"v1 schema", v1Schema, newStore, true}, + {"v2 schema", v2Schema, newStore, true}, + {"v3 schema", v3Schema, newStore, true}, + {"v4 schema", v4Schema, newStore, true}, + {"v5 schema", v5Schema, newStore, true}, + {"v6 schema", v6Schema, newStore, true}, + {"v7 schema", v7Schema, newStore, true}, + {"v8 schema", v8Schema, newStore, false}, + {"v9 schema", v9Schema, newSeriesStore, true}, } // newTestStore creates a new Store for testing. -func newTestChunkStore(t *testing.T, schemaFn func(SchemaConfig) Schema) *store { +func newTestChunkStore(t *testing.T, schemaFactory schemaFactory, storeFactory storeFactory) Store { var ( storeCfg StoreConfig schemaCfg SchemaConfig @@ -52,7 +57,7 @@ func newTestChunkStore(t *testing.T, schemaFn func(SchemaConfig) Schema) *store err = tableManager.SyncTables(context.Background()) require.NoError(t, err) - store, err := newStore(storeCfg, schemaFn(schemaCfg), storage) + store, err := storeFactory(storeCfg, schemaFactory(schemaCfg), storage) require.NoError(t, err) return store } @@ -201,7 +206,7 @@ func TestChunkStore_Get(t *testing.T) { for _, schema := range schemas { t.Run(fmt.Sprintf("%s / %s", tc.query, schema.name), func(t *testing.T) { t.Log("========= Running query", tc.query, "with schema", schema.name) - store := newTestChunkStore(t, schema.fn) + store := newTestChunkStore(t, schema.schemaFn, schema.storeFn) if err := store.Put(ctx, []Chunk{ fooChunk1, @@ -231,8 +236,6 @@ func TestChunkStore_Get(t *testing.T) { sort.Sort(ByFingerprint(matrix1)) if !reflect.DeepEqual(tc.expect, matrix1) { - t.Fatalf("jml\nstart = %#v\nnow = %#v\nfooChunk1 = %#v\nfooChunk2 = %#v\nbarChunk1 = %#v\nbarChunk2 = %#v\n", - now.Add(-time.Hour), now, fooChunk1, fooChunk2, barChunk1, barChunk2) t.Fatalf("%s: wrong chunks - %s", tc.query, test.Diff(tc.expect, matrix1)) } @@ -264,7 +267,6 @@ func TestChunkStore_Get(t *testing.T) { func TestChunkStore_getMetricNameChunks(t *testing.T) { ctx := user.InjectOrgID(context.Background(), userID) now := model.Now() - metricName := "foo" chunk1 := dummyChunkFor(now, model.Metric{ model.MetricNameLabel: "foo", "bar": "baz", @@ -278,65 +280,61 @@ func TestChunkStore_getMetricNameChunks(t *testing.T) { }) for _, tc := range []struct { - query string - expect []Chunk - matchers []*labels.Matcher + query string + expect []Chunk }{ { `foo`, []Chunk{chunk1, chunk2}, - []*labels.Matcher{}, }, { `foo{flip=""}`, []Chunk{chunk2}, - []*labels.Matcher{mustNewLabelMatcher(labels.MatchEqual, "flip", "")}, }, { `foo{bar="baz"}`, []Chunk{chunk1}, - []*labels.Matcher{mustNewLabelMatcher(labels.MatchEqual, "bar", "baz")}, }, { `foo{bar="beep"}`, []Chunk{chunk2}, - []*labels.Matcher{mustNewLabelMatcher(labels.MatchEqual, "bar", "beep")}, }, { `foo{toms="code"}`, []Chunk{chunk1, chunk2}, - []*labels.Matcher{mustNewLabelMatcher(labels.MatchEqual, "toms", "code")}, }, { `foo{bar!="baz"}`, []Chunk{chunk2}, - []*labels.Matcher{mustNewLabelMatcher(labels.MatchNotEqual, "bar", "baz")}, }, { `foo{bar=~"beep|baz"}`, []Chunk{chunk1, chunk2}, - []*labels.Matcher{mustNewLabelMatcher(labels.MatchRegexp, "bar", "beep|baz")}, }, { `foo{toms="code", bar=~"beep|baz"}`, []Chunk{chunk1, chunk2}, - []*labels.Matcher{mustNewLabelMatcher(labels.MatchEqual, "toms", "code"), mustNewLabelMatcher(labels.MatchRegexp, "bar", "beep|baz")}, }, { `foo{toms="code", bar="baz"}`, - []Chunk{chunk1}, []*labels.Matcher{mustNewLabelMatcher(labels.MatchEqual, "toms", "code"), mustNewLabelMatcher(labels.MatchEqual, "bar", "baz")}, + []Chunk{chunk1}, }, } { for _, schema := range schemas { t.Run(fmt.Sprintf("%s / %s", tc.query, schema.name), func(t *testing.T) { t.Log("========= Running query", tc.query, "with schema", schema.name) - store := newTestChunkStore(t, schema.fn) + store := newTestChunkStore(t, schema.schemaFn, schema.storeFn) if err := store.Put(ctx, []Chunk{chunk1, chunk2}); err != nil { t.Fatal(err) } - chunks, err := store.getMetricNameChunks(ctx, now.Add(-time.Hour), now, tc.matchers, metricName) + matchers, err := promql.ParseMetricSelector(tc.query) + if err != nil { + t.Fatal(err) + } + + chunks, err := store.Get(ctx, now.Add(-time.Hour), now, matchers...) require.NoError(t, err) if !reflect.DeepEqual(tc.expect, chunks) { @@ -360,7 +358,7 @@ func TestChunkStoreRandom(t *testing.T) { for _, schema := range schemas { t.Run(schema.name, func(t *testing.T) { - store := newTestChunkStore(t, schema.fn) + store := newTestChunkStore(t, schema.schemaFn, schema.storeFn) // put 100 chunks from 0 to 99 const chunkLen = 13 * 3600 // in seconds @@ -395,10 +393,11 @@ func TestChunkStoreRandom(t *testing.T) { startTime := model.TimeFromUnix(start) endTime := model.TimeFromUnix(end) - metricNameLabel := mustNewLabelMatcher(labels.MatchEqual, model.MetricNameLabel, "foo") - matchers := []*labels.Matcher{mustNewLabelMatcher(labels.MatchEqual, "bar", "baz")} - chunks, err := store.getMetricNameChunks(ctx, startTime, endTime, - matchers, metricNameLabel.Value) + matchers := []*labels.Matcher{ + mustNewLabelMatcher(labels.MatchEqual, model.MetricNameLabel, "foo"), + mustNewLabelMatcher(labels.MatchEqual, "bar", "baz"), + } + chunks, err := store.Get(ctx, startTime, endTime, matchers...) require.NoError(t, err) // We need to check that each chunk is in the time range @@ -422,7 +421,7 @@ func TestChunkStoreRandom(t *testing.T) { func TestChunkStoreLeastRead(t *testing.T) { // Test we don't read too much from the index ctx := user.InjectOrgID(context.Background(), userID) - store := newTestChunkStore(t, v6Schema) + store := newTestChunkStore(t, v6Schema, newStore) // Put 24 chunks 1hr chunks in the store const chunkLen = 60 // in seconds @@ -456,14 +455,12 @@ func TestChunkStoreLeastRead(t *testing.T) { startTime := model.TimeFromUnix(start) endTime := model.TimeFromUnix(end) + matchers := []*labels.Matcher{ + mustNewLabelMatcher(labels.MatchEqual, model.MetricNameLabel, "foo"), + mustNewLabelMatcher(labels.MatchEqual, "bar", "baz"), + } - metricNameLabel := mustNewLabelMatcher(labels.MatchEqual, model.MetricNameLabel, "foo") - matchers := []*labels.Matcher{mustNewLabelMatcher(labels.MatchEqual, "bar", "baz")} - - chunks, err := store.getMetricNameChunks(ctx, startTime, endTime, - matchers, - metricNameLabel.Value, - ) + chunks, err := store.Get(ctx, startTime, endTime, matchers...) if err != nil { t.Fatal(t, err) } diff --git a/pkg/chunk/chunk_store_utils.go b/pkg/chunk/chunk_store_utils.go new file mode 100644 index 00000000000..2e54c5e3bdf --- /dev/null +++ b/pkg/chunk/chunk_store_utils.go @@ -0,0 +1,168 @@ +package chunk + +import ( + "context" + + "github.com/go-kit/kit/log" + "github.com/go-kit/kit/log/level" + ot "github.com/opentracing/opentracing-go" + otlog "github.com/opentracing/opentracing-go/log" + "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/pkg/labels" + "github.com/prometheus/prometheus/promql" + + "github.com/weaveworks/cortex/pkg/chunk/cache" + "github.com/weaveworks/cortex/pkg/util" +) + +func filterChunksByTime(from, through model.Time, chunks []Chunk) ([]Chunk, []string) { + filtered := make([]Chunk, 0, len(chunks)) + keys := make([]string, 0, len(chunks)) + for _, chunk := range chunks { + if chunk.Through < from || through < chunk.From { + continue + } + filtered = append(filtered, chunk) + keys = append(keys, chunk.ExternalKey()) + } + return filtered, keys +} + +func filterChunksByMatchers(chunks []Chunk, filters []*labels.Matcher) []Chunk { + filteredChunks := make([]Chunk, 0, len(chunks)) +outer: + for _, chunk := range chunks { + for _, filter := range filters { + if !filter.Matches(string(chunk.Metric[model.LabelName(filter.Name)])) { + continue outer + } + } + filteredChunks = append(filteredChunks, chunk) + } + return filteredChunks +} + +// spanLogger unifies tracing and logging, to reduce repetition. +type spanLogger struct { + log.Logger + ot.Span +} + +func newSpanLogger(ctx context.Context, method string) (*spanLogger, context.Context) { + span, ctx := ot.StartSpanFromContext(ctx, "ChunkStore.Get") + return &spanLogger{ + Logger: log.With(util.WithContext(ctx, util.Logger), "method", method), + Span: span, + }, ctx +} + +func (s *spanLogger) Log(kvps ...interface{}) error { + s.Logger.Log(kvps...) + fields, err := otlog.InterleavedKVToFields(kvps...) + if err != nil { + return err + } + s.Span.LogFields(fields...) + return nil +} + +// chunkFetcher deals with fetching chunk contents from the cache/store, +// and writing back any misses to the cache. +type chunkFetcher struct { + storage StorageClient + cache cache.Cache +} + +func newChunkFetcher(cfg cache.Config, storage StorageClient) (*chunkFetcher, error) { + cache, err := cache.New(cfg) + if err != nil { + return nil, err + } + + return &chunkFetcher{ + storage: storage, + cache: cache, + }, nil +} + +func (c *chunkFetcher) Stop() { + c.cache.Stop() +} + +func (c *chunkFetcher) fetchChunks(ctx context.Context, chunks []Chunk, keys []string) ([]Chunk, error) { + log, ctx := newSpanLogger(ctx, "ChunkStore.getMetricNameChunks") + defer log.Span.Finish() + + // Now fetch the actual chunk data from Memcache / S3 + cacheHits, cacheBufs, _, err := c.cache.FetchChunkData(ctx, keys) + if err != nil { + level.Warn(log).Log("msg", "error fetching from cache", "err", err) + } + + fromCache, missing, err := ProcessCacheResponse(chunks, cacheHits, cacheBufs) + if err != nil { + level.Warn(log).Log("msg", "error fetching from cache", "err", err) + } + + fromStorage, err := c.storage.GetChunks(ctx, missing) + + // Always cache any chunks we did get + if cacheErr := c.writeBackCache(ctx, fromStorage); cacheErr != nil { + level.Warn(log).Log("msg", "could not store chunks in chunk cache", "err", cacheErr) + } + + if err != nil { + return nil, promql.ErrStorage(err) + } + + allChunks := append(fromCache, fromStorage...) + return allChunks, nil +} + +func (c *chunkFetcher) writeBackCache(ctx context.Context, chunks []Chunk) error { + for i := range chunks { + encoded, err := chunks[i].Encode() + if err != nil { + return err + } + if err := c.cache.StoreChunk(ctx, chunks[i].ExternalKey(), encoded); err != nil { + return err + } + } + return nil +} + +// ProcessCacheResponse decodes the chunks coming back from the cache, separating +// hits and misses. +func ProcessCacheResponse(chunks []Chunk, keys []string, bufs [][]byte) (found []Chunk, missing []Chunk, err error) { + decodeContext := NewDecodeContext() + + i, j := 0, 0 + for i < len(chunks) && j < len(keys) { + chunkKey := chunks[i].ExternalKey() + + if chunkKey < keys[j] { + missing = append(missing, chunks[i]) + i++ + } else if chunkKey > keys[j] { + level.Debug(util.Logger).Log("msg", "got chunk from cache we didn't ask for") + j++ + } else { + chunk := chunks[i] + err = chunk.Decode(decodeContext, bufs[j]) + if err != nil { + cacheCorrupt.Inc() + return + } + found = append(found, chunk) + i++ + j++ + } + } + + for ; i < len(chunks); i++ { + missing = append(missing, chunks[i]) + } + + return +} diff --git a/pkg/chunk/composite_store.go b/pkg/chunk/composite_store.go index 17428da6c3a..075ea6006a6 100644 --- a/pkg/chunk/composite_store.go +++ b/pkg/chunk/composite_store.go @@ -101,6 +101,14 @@ func NewCompositeStore(cfg StoreConfig, schemaCfg SchemaConfig, storage StorageC stores = append(stores, compositeStoreEntry{schemaCfg.V8SchemaFrom.Time, store}) } + if schemaCfg.V9SchemaFrom.IsSet() { + store, err := newSeriesStore(cfg, v9Schema(schemaCfg), storage) + if err != nil { + return nil, err + } + stores = append(stores, compositeStoreEntry{schemaCfg.V9SchemaFrom.Time, store}) + } + if !sort.IsSorted(byStart(stores)) { return nil, fmt.Errorf("schemas not in time-sorted order") } diff --git a/pkg/chunk/inmemory_storage_client.go b/pkg/chunk/inmemory_storage_client.go index 3dea4728db9..f6480691330 100644 --- a/pkg/chunk/inmemory_storage_client.go +++ b/pkg/chunk/inmemory_storage_client.go @@ -143,7 +143,8 @@ func (m *MockStorage) BatchWrite(ctx context.Context, batch WriteBatch) error { // Return error if duplicate write and not metric name entry or series entry itemComponents := decodeRangeKey(items[i].rangeValue) if !bytes.Equal(itemComponents[3], metricNameRangeKeyV1) && - !bytes.Equal(itemComponents[3], seriesRangeKeyV1) { + !bytes.Equal(itemComponents[3], seriesRangeKeyV1) && + !bytes.Equal(itemComponents[3], labelSeriesRangeKeyV1) { return fmt.Errorf("Dupe write") } } @@ -160,6 +161,8 @@ func (m *MockStorage) BatchWrite(ctx context.Context, batch WriteBatch) error { // QueryPages implements StorageClient. func (m *MockStorage) QueryPages(ctx context.Context, query IndexQuery, callback func(result ReadBatch) (shouldContinue bool)) error { logger := util.WithContext(ctx, util.Logger) + level.Debug(logger).Log("msg", "QueryPages", "query", query.HashValue) + m.mtx.RLock() defer m.mtx.RUnlock() @@ -170,11 +173,12 @@ func (m *MockStorage) QueryPages(ctx context.Context, query IndexQuery, callback items, ok := table.items[query.HashValue] if !ok { + level.Debug(logger).Log("msg", "not found") return nil } if query.RangeValuePrefix != nil { - level.Debug(util.WithContext(ctx, logger)).Log("msg", "lookup prefix", "hash", query.HashValue, "range_prefix", query.RangeValuePrefix, "num_items", len(items)) + level.Debug(logger).Log("msg", "lookup prefix", "hash", query.HashValue, "range_prefix", query.RangeValuePrefix, "num_items", len(items)) // the smallest index i in [0, n) at which f(i) is true i := sort.Search(len(items), func(i int) bool { @@ -190,33 +194,33 @@ func (m *MockStorage) QueryPages(ctx context.Context, query IndexQuery, callback return !bytes.HasPrefix(items[i+j].rangeValue, query.RangeValuePrefix) }) - level.Debug(util.WithContext(ctx, logger)).Log("msg", "found range", "from_inclusive", i, "to_exclusive", i+j) + level.Debug(logger).Log("msg", "found range", "from_inclusive", i, "to_exclusive", i+j) if i > len(items) || j == 0 { return nil } items = items[i : i+j] } else if query.RangeValueStart != nil { - level.Debug(util.WithContext(ctx, logger)).Log("msg", "lookup range", "hash", query.HashValue, "range_start", query.RangeValueStart, "num_items", len(items)) + level.Debug(logger).Log("msg", "lookup range", "hash", query.HashValue, "range_start", query.RangeValueStart, "num_items", len(items)) // the smallest index i in [0, n) at which f(i) is true i := sort.Search(len(items), func(i int) bool { return bytes.Compare(items[i].rangeValue, query.RangeValueStart) >= 0 }) - level.Debug(util.WithContext(ctx, logger)).Log("msg", "found range [%d)", "index", i) + level.Debug(logger).Log("msg", "found range [%d)", "index", i) if i > len(items) { return nil } items = items[i:] } else { - level.Debug(util.WithContext(ctx, logger)).Log("msg", "lookup", "hash", query.HashValue, "num_items", len(items)) + level.Debug(logger).Log("msg", "lookup", "hash", query.HashValue, "num_items", len(items)) } // Filters if query.ValueEqual != nil { - level.Debug(util.WithContext(ctx, logger)).Log("msg", "filter by equality", "value_equal", query.ValueEqual) + level.Debug(logger).Log("msg", "filter by equality", "value_equal", query.ValueEqual) filtered := make([]mockItem, 0) for _, v := range items { diff --git a/pkg/chunk/schema.go b/pkg/chunk/schema.go index 9657fa1cfbf..acb292bc96f 100644 --- a/pkg/chunk/schema.go +++ b/pkg/chunk/schema.go @@ -18,7 +18,10 @@ var ( chunkTimeRangeKeyV4 = []byte{'4'} chunkTimeRangeKeyV5 = []byte{'5'} metricNameRangeKeyV1 = []byte{'6'} - seriesRangeKeyV1 = []byte{'7'} + + // For v9 schema + seriesRangeKeyV1 = []byte{'7'} + labelSeriesRangeKeyV1 = []byte{'8'} ) // Errors @@ -37,6 +40,9 @@ type Schema interface { GetReadQueriesForMetric(from, through model.Time, userID string, metricName model.LabelValue) ([]IndexQuery, error) GetReadQueriesForMetricLabel(from, through model.Time, userID string, metricName model.LabelValue, labelName model.LabelName) ([]IndexQuery, error) GetReadQueriesForMetricLabelValue(from, through model.Time, userID string, metricName model.LabelValue, labelName model.LabelName, labelValue model.LabelValue) ([]IndexQuery, error) + + // If the query resulted in series IDs, use this method to find chunks. + GetChunksForSeries(from, through model.Time, userID string, seriesID []byte) ([]IndexQuery, error) } // IndexQuery describes a query for entries @@ -142,6 +148,14 @@ func v8Schema(cfg SchemaConfig) Schema { } } +// v9 schema index series, not chunks. +func v9Schema(cfg SchemaConfig) Schema { + return schema{ + cfg.dailyBuckets, + v9Entries{}, + } +} + // schema implements Schema given a bucketing function and and set of range key callbacks type schema struct { buckets func(from, through model.Time, userID string) []Bucket @@ -217,12 +231,27 @@ func (s schema) GetReadQueriesForMetricLabelValue(from, through model.Time, user return result, nil } +func (s schema) GetChunksForSeries(from, through model.Time, userID string, seriesID []byte) ([]IndexQuery, error) { + var result []IndexQuery + + buckets := s.buckets(from, through, userID) + for _, bucket := range buckets { + entries, err := s.entries.GetChunksForSeries(bucket, seriesID) + if err != nil { + return nil, err + } + result = append(result, entries...) + } + return result, nil +} + type entries interface { GetWriteEntries(bucket Bucket, metricName model.LabelValue, labels model.Metric, chunkID string) ([]IndexEntry, error) GetReadQueries(bucket Bucket) ([]IndexQuery, error) GetReadMetricQueries(bucket Bucket, metricName model.LabelValue) ([]IndexQuery, error) GetReadMetricLabelQueries(bucket Bucket, metricName model.LabelValue, labelName model.LabelName) ([]IndexQuery, error) GetReadMetricLabelValueQueries(bucket Bucket, metricName model.LabelValue, labelName model.LabelName, labelValue model.LabelValue) ([]IndexQuery, error) + GetChunksForSeries(bucket Bucket, seriesID []byte) ([]IndexQuery, error) } type originalEntries struct{} @@ -283,6 +312,10 @@ func (originalEntries) GetReadMetricLabelValueQueries(bucket Bucket, metricName }, nil } +func (originalEntries) GetChunksForSeries(_ Bucket, _ []byte) ([]IndexQuery, error) { + return nil, ErrNoMetricNameNotSupported +} + type base64Entries struct { originalEntries } @@ -380,6 +413,10 @@ func (labelNameInHashKeyEntries) GetReadMetricLabelValueQueries(bucket Bucket, m }, nil } +func (labelNameInHashKeyEntries) GetChunksForSeries(_ Bucket, _ []byte) ([]IndexQuery, error) { + return nil, ErrNoMetricNameNotSupported +} + // v5Entries includes chunk end time in range key - see #298. type v5Entries struct{} @@ -441,6 +478,10 @@ func (v5Entries) GetReadMetricLabelValueQueries(bucket Bucket, metricName model. }, nil } +func (v5Entries) GetChunksForSeries(_ Bucket, _ []byte) ([]IndexQuery, error) { + return nil, ErrNoMetricNameNotSupported +} + // v6Entries fixes issues with v5 time encoding being wrong (see #337), and // moves label value out of range key (see #199). type v6Entries struct{} @@ -510,6 +551,10 @@ func (v6Entries) GetReadMetricLabelValueQueries(bucket Bucket, metricName model. }, nil } +func (v6Entries) GetChunksForSeries(_ Bucket, _ []byte) ([]IndexQuery, error) { + return nil, ErrNoMetricNameNotSupported +} + // v7Entries is a deprecated scherma initially used to support queries with no metric name. Use v8Entries instead. type v7Entries struct { v6Entries @@ -579,3 +624,89 @@ func (v8Entries) GetReadQueries(bucket Bucket) ([]IndexQuery, error) { }, }, nil } + +// v9Entries adds a layer of indirection between labels -> series -> chunks. +type v9Entries struct { +} + +func (v9Entries) GetWriteEntries(bucket Bucket, metricName model.LabelValue, labels model.Metric, chunkID string) ([]IndexEntry, error) { + seriesID := sha256bytes(labels.String()) + encodedThroughBytes := encodeTime(bucket.through) + + entries := []IndexEntry{ + // Entry for metricName -> seriesID + { + TableName: bucket.tableName, + HashValue: bucket.hashKey + ":" + string(metricName), + RangeValue: encodeRangeKey(seriesID, nil, nil, seriesRangeKeyV1), + }, + // Entry for seriesID -> chunkID + { + TableName: bucket.tableName, + HashValue: bucket.hashKey + ":" + string(seriesID), + RangeValue: encodeRangeKey(encodedThroughBytes, nil, []byte(chunkID), chunkTimeRangeKeyV3), + }, + } + + // Entries for metricName:labelName -> hash(value):seriesID + // We use a hash of the value to limit its length. + for key, value := range labels { + if key == model.MetricNameLabel { + continue + } + valueHash := sha256bytes(string(value)) + entries = append(entries, IndexEntry{ + TableName: bucket.tableName, + HashValue: fmt.Sprintf("%s:%s:%s", bucket.hashKey, metricName, key), + RangeValue: encodeRangeKey(valueHash, seriesID, nil, labelSeriesRangeKeyV1), + Value: []byte(value), + }) + } + + return entries, nil +} + +func (v9Entries) GetReadQueries(bucket Bucket) ([]IndexQuery, error) { + return nil, ErrNoMetricNameNotSupported +} + +func (v9Entries) GetReadMetricQueries(bucket Bucket, metricName model.LabelValue) ([]IndexQuery, error) { + return []IndexQuery{ + { + TableName: bucket.tableName, + HashValue: bucket.hashKey + ":" + string(metricName), + }, + }, nil +} + +func (v9Entries) GetReadMetricLabelQueries(bucket Bucket, metricName model.LabelValue, labelName model.LabelName) ([]IndexQuery, error) { + return []IndexQuery{ + { + TableName: bucket.tableName, + HashValue: fmt.Sprintf("%s:%s:%s", bucket.hashKey, metricName, labelName), + }, + }, nil +} + +func (v9Entries) GetReadMetricLabelValueQueries(bucket Bucket, metricName model.LabelValue, labelName model.LabelName, labelValue model.LabelValue) ([]IndexQuery, error) { + valueHash := sha256bytes(string(labelValue)) + return []IndexQuery{ + { + TableName: bucket.tableName, + HashValue: fmt.Sprintf("%s:%s:%s", bucket.hashKey, metricName, labelName), + RangeValueStart: encodeRangeKey(valueHash), + ValueEqual: []byte(labelValue), + }, + }, nil +} + +func (v9Entries) GetChunksForSeries(bucket Bucket, seriesID []byte) ([]IndexQuery, error) { + encodedFromBytes := encodeTime(bucket.from) + return []IndexQuery{ + { + TableName: bucket.tableName, + HashValue: bucket.hashKey + ":" + string(seriesID), + RangeValueStart: encodeRangeKey(encodedFromBytes), + }, + }, nil +} diff --git a/pkg/chunk/schema_config.go b/pkg/chunk/schema_config.go index 44d2b85306e..da608a44f21 100644 --- a/pkg/chunk/schema_config.go +++ b/pkg/chunk/schema_config.go @@ -30,6 +30,7 @@ type SchemaConfig struct { V6SchemaFrom util.DayValue V7SchemaFrom util.DayValue V8SchemaFrom util.DayValue + V9SchemaFrom util.DayValue // Master 'off-switch' for table capacity updates, e.g. when troubleshooting ThroughputUpdatesDisabled bool @@ -59,6 +60,7 @@ func (cfg *SchemaConfig) RegisterFlags(f *flag.FlagSet) { f.Var(&cfg.V6SchemaFrom, "dynamodb.v6-schema-from", "The date (in the format YYYY-MM-DD) after which we enable v6 schema.") f.Var(&cfg.V7SchemaFrom, "dynamodb.v7-schema-from", "The date (in the format YYYY-MM-DD) after which we enable v7 schema (Deprecated).") f.Var(&cfg.V8SchemaFrom, "dynamodb.v8-schema-from", "The date (in the format YYYY-MM-DD) after which we enable v8 schema (Deprecated).") + f.Var(&cfg.V9SchemaFrom, "dynamodb.v9-schema-from", "The data (in the format YYYY-MM-DD) after which we enable v9 schema (Series indexing).") f.BoolVar(&cfg.ThroughputUpdatesDisabled, "table-manager.throughput-updates-disabled", false, "If true, disable all changes to DB capacity") f.DurationVar(&cfg.DynamoDBPollInterval, "dynamodb.poll-interval", 2*time.Minute, "How frequently to poll DynamoDB to learn our capacity.") diff --git a/pkg/chunk/schema_test.go b/pkg/chunk/schema_test.go index 73a7d7e2cb7..e99ad66d286 100644 --- a/pkg/chunk/schema_test.go +++ b/pkg/chunk/schema_test.go @@ -394,7 +394,7 @@ func TestSchemaRangeKey(t *testing.T) { _, err := parseMetricNameRangeValue(entry.RangeValue, entry.Value) require.NoError(t, err) case ChunkTimeRangeValue: - _, _, _, err := parseChunkTimeRangeValue(entry.RangeValue, entry.Value) + _, _, _, _, err := parseChunkTimeRangeValue(entry.RangeValue, entry.Value) require.NoError(t, err) case SeriesRangeValue: _, err := parseSeriesRangeValue(entry.RangeValue, entry.Value) diff --git a/pkg/chunk/schema_util.go b/pkg/chunk/schema_util.go index ec5d0d9eeaf..bbf5d5074dc 100644 --- a/pkg/chunk/schema_util.go +++ b/pkg/chunk/schema_util.go @@ -19,6 +19,11 @@ func metricSeriesID(m model.Metric) string { return string(encodeBase64Bytes(h[:])) } +func sha256bytes(s string) []byte { + h := sha256.Sum256([]byte(s)) + return encodeBase64Bytes(h[:]) +} + func encodeRangeKey(ss ...[]byte) []byte { length := 0 for _, s := range ss { @@ -127,45 +132,70 @@ func parseSeriesRangeValue(rangeValue []byte, value []byte) (model.Metric, error // parseChunkTimeRangeValue returns the chunkKey, labelValue and metadataInIndex // for chunk time range values. -func parseChunkTimeRangeValue(rangeValue []byte, value []byte) (string, model.LabelValue, bool, error) { +func parseChunkTimeRangeValue(rangeValue []byte, value []byte) ( + chunkID string, labelValue model.LabelValue, metadataInIndex bool, + isSeriesID bool, err error, +) { components := decodeRangeKey(rangeValue) switch { case len(components) < 3: - return "", "", false, errors.Errorf("invalid chunk time range value: %x", rangeValue) + err = errors.Errorf("invalid chunk time range value: %x", rangeValue) + return // v1 & v2 schema had three components - label name, label value and chunk ID. // No version number. case len(components) == 3: - return string(components[2]), model.LabelValue(components[1]), true, nil + chunkID = string(components[2]) + labelValue = model.LabelValue(components[1]) + metadataInIndex = true + return // v3 schema had four components - label name, label value, chunk ID and version. // "version" is 1 and label value is base64 encoded. case bytes.Equal(components[3], chunkTimeRangeKeyV1): - labelValue, err := decodeBase64Value(components[1]) - return string(components[2]), labelValue, false, err + chunkID = string(components[2]) + labelValue, err = decodeBase64Value(components[1]) + return // v4 schema wrote v3 range keys and a new range key - version 2, // with four components - , , chunk ID and version. case bytes.Equal(components[3], chunkTimeRangeKeyV2): - return string(components[2]), model.LabelValue(""), false, nil + chunkID = string(components[2]) + return // v5 schema version 3 range key is chunk end time, , chunk ID, version case bytes.Equal(components[3], chunkTimeRangeKeyV3): - return string(components[2]), model.LabelValue(""), false, nil + chunkID = string(components[2]) + return // v5 schema version 4 range key is chunk end time, label value, chunk ID, version case bytes.Equal(components[3], chunkTimeRangeKeyV4): - labelValue, err := decodeBase64Value(components[1]) - return string(components[2]), labelValue, false, err + chunkID = string(components[2]) + labelValue, err = decodeBase64Value(components[1]) + return // v6 schema added version 5 range keys, which have the label value written in // to the value, not the range key. So they are [chunk end time, , chunk ID, version]. case bytes.Equal(components[3], chunkTimeRangeKeyV5): - labelValue := model.LabelValue(value) - return string(components[2]), labelValue, false, nil + chunkID = string(components[2]) + labelValue = model.LabelValue(value) + return + + // v9 schema actually return series IDs + case bytes.Equal(components[3], seriesRangeKeyV1): + chunkID = string(components[0]) + isSeriesID = true + return + + case bytes.Equal(components[3], labelSeriesRangeKeyV1): + chunkID = string(components[1]) + labelValue = model.LabelValue(value) + isSeriesID = true + return default: - return "", model.LabelValue(""), false, fmt.Errorf("unrecognised chunkTimeRangeKey version: '%v'", string(components[3])) + err = fmt.Errorf("unrecognised chunkTimeRangeKey version: '%v'", string(components[3])) + return } } diff --git a/pkg/chunk/schema_util_test.go b/pkg/chunk/schema_util_test.go index cd308bbb941..c112967c017 100644 --- a/pkg/chunk/schema_util_test.go +++ b/pkg/chunk/schema_util_test.go @@ -99,7 +99,7 @@ func TestParseChunkTimeRangeValue(t *testing.T) { {[]byte("a1b2c3d4\x00Y29kZQ\x002:1484661279394:1484664879394\x004\x00"), "code", "2:1484661279394:1484664879394"}, } { - chunkID, labelValue, _, err := parseChunkTimeRangeValue(c.encoded, nil) + chunkID, labelValue, _, _, err := parseChunkTimeRangeValue(c.encoded, nil) require.NoError(t, err) assert.Equal(t, model.LabelValue(c.value), labelValue) assert.Equal(t, c.chunkID, chunkID) diff --git a/pkg/chunk/series_store.go b/pkg/chunk/series_store.go new file mode 100644 index 00000000000..55fbc00444d --- /dev/null +++ b/pkg/chunk/series_store.go @@ -0,0 +1,211 @@ +package chunk + +import ( + "context" + "fmt" + + "github.com/go-kit/kit/log/level" + "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/pkg/labels" + + "github.com/weaveworks/common/user" + "github.com/weaveworks/cortex/pkg/util" + "github.com/weaveworks/cortex/pkg/util/extract" +) + +// seriesStore implements Store +type seriesStore struct { + store +} + +func newSeriesStore(cfg StoreConfig, schema Schema, storage StorageClient) (Store, error) { + fetcher, err := newChunkFetcher(cfg.CacheConfig, storage) + if err != nil { + return nil, err + } + + return &seriesStore{ + store: store{ + cfg: cfg, + storage: storage, + schema: schema, + chunkFetcher: fetcher, + }, + }, nil +} + +// Get implements Store +func (c *seriesStore) Get(ctx context.Context, from, through model.Time, allMatchers ...*labels.Matcher) ([]Chunk, error) { + log, ctx := newSpanLogger(ctx, "ChunkStore.Get") + defer log.Span.Finish() + level.Debug(log).Log("from", from, "through", through, "matchers", len(allMatchers)) + + // Validate the query is within reasonable bounds. + shortcut, err := c.validateQuery(ctx, from, &through) + if err != nil { + return nil, err + } else if shortcut { + return nil, nil + } + + // Ensure this query includes a metric name. + metricNameMatcher, matchers, ok := extract.MetricNameMatcherFromMatchers(allMatchers) + if !ok || metricNameMatcher.Type != labels.MatchEqual { + return nil, fmt.Errorf("query must contain metric name") + } + log.Span.SetTag("metric", metricNameMatcher.Value) + + // Fetch the series IDs from the index, based on non-empty matchers from + // the query. + filters, matchers := util.SplitFiltersAndMatchers(matchers) + seriesIDs, err := c.lookupSeriesByMetricNameMatchers(ctx, from, through, metricNameMatcher.Value, matchers) + if err != nil { + return nil, err + } + level.Debug(log).Log("Series IDs", len(seriesIDs)) + + // Lookup the series in the index to get the chunks. + chunkIDs, err := c.lookupChunksBySeries(ctx, from, through, seriesIDs) + if err != nil { + return nil, err + } + level.Debug(log).Log("Chunk IDs", len(chunkIDs)) + + // Protect ourselves against OOMing. + if len(chunkIDs) > c.cfg.QueryChunkLimit { + err := fmt.Errorf("Query %v fetched too many chunks (%d > %d)", allMatchers, len(chunkIDs), c.cfg.QueryChunkLimit) + level.Error(log).Log("err", err) + return nil, err + } + + // Filter out chunks that are not in the selected time range. + chunks, err := c.convertChunkIDsToChunks(ctx, chunkIDs) + if err != nil { + return nil, err + } + filtered, keys := filterChunksByTime(from, through, chunks) + level.Debug(log).Log("Chunks post filtering", len(chunks)) + + // Now fetch the actual chunk data from Memcache / S3 + allChunks, err := c.fetchChunks(ctx, filtered, keys) + if err != nil { + level.Error(log).Log("err", err) + return nil, err + } + + // Filter out chunks based on the empty matchers in the query. + filteredChunks := filterChunksByMatchers(allChunks, filters) + return filteredChunks, nil +} + +func (c *seriesStore) lookupSeriesByMetricNameMatchers(ctx context.Context, from, through model.Time, metricName string, matchers []*labels.Matcher) ([]string, error) { + log, ctx := newSpanLogger(ctx, "ChunkStore.lookupChunksByMetricName") + log.Finish() + + // Just get series for metric if there are no matchers + if len(matchers) == 0 { + return c.lookupSeriesByMetricNameMatcher(ctx, from, through, metricName, nil) + } + + // Otherwise get series which include other matchers + incomingIDs := make(chan []string) + incomingErrors := make(chan error) + for _, matcher := range matchers { + go func(matcher *labels.Matcher) { + ids, err := c.lookupSeriesByMetricNameMatcher(ctx, from, through, metricName, matcher) + if err != nil { + incomingErrors <- err + return + } + incomingIDs <- ids + }(matcher) + } + + // Receive chunkSets from all matchers + var ids []string + var lastErr error + for i := 0; i < len(matchers); i++ { + select { + case incoming := <-incomingIDs: + if ids == nil { + ids = incoming + } else { + ids = intersectStrings(ids, incoming) + } + case err := <-incomingErrors: + lastErr = err + } + } + if lastErr != nil { + return nil, lastErr + } + + level.Debug(log).Log("msg", "post intersection", "ids", len(ids)) + return ids, nil +} + +func (c *seriesStore) lookupSeriesByMetricNameMatcher(ctx context.Context, from, through model.Time, metricName string, matcher *labels.Matcher) ([]string, error) { + log, ctx := newSpanLogger(ctx, "ChunkStore.lookupChunksByMetricName") + defer log.Span.Finish() + + userID, err := user.ExtractOrgID(ctx) + if err != nil { + return nil, err + } + + var queries []IndexQuery + if matcher == nil { + queries, err = c.schema.GetReadQueriesForMetric(from, through, userID, model.LabelValue(metricName)) + } else if matcher.Type != labels.MatchEqual { + queries, err = c.schema.GetReadQueriesForMetricLabel(from, through, userID, model.LabelValue(metricName), model.LabelName(matcher.Name)) + } else { + queries, err = c.schema.GetReadQueriesForMetricLabelValue(from, through, userID, model.LabelValue(metricName), model.LabelName(matcher.Name), model.LabelValue(matcher.Value)) + } + if err != nil { + return nil, err + } + level.Debug(log).Log("queries", len(queries), "matcher", matcher) + + entries, err := c.lookupEntriesByQueries(ctx, queries) + if err != nil { + return nil, err + } + level.Debug(log).Log("entries", len(entries), "matcher", matcher) + + ids, err := c.parseIndexEntries(ctx, entries, matcher) + if err != nil { + return nil, err + } + level.Debug(log).Log("ids", len(ids), "matcher", matcher) + + return ids, nil +} + +func (c *seriesStore) lookupChunksBySeries(ctx context.Context, from, through model.Time, seriesIDs []string) ([]string, error) { + log, ctx := newSpanLogger(ctx, "ChunkStore.lookupChunksBySeries") + + userID, err := user.ExtractOrgID(ctx) + if err != nil { + return nil, err + } + level.Debug(log).Log("seriesIDs", len(seriesIDs)) + + queries := make([]IndexQuery, 0, len(seriesIDs)) + for _, seriesID := range seriesIDs { + qs, err := c.schema.GetChunksForSeries(from, through, userID, []byte(seriesID)) + if err != nil { + return nil, err + } + queries = append(queries, qs...) + } + level.Debug(log).Log("queries", len(queries)) + + entries, err := c.lookupEntriesByQueries(ctx, queries) + if err != nil { + return nil, err + } + level.Debug(log).Log("entries", len(entries)) + + result, err := c.parseIndexEntries(ctx, entries, nil) + return result, err +} From 9248f0dd662843fe9719aea9e800b265841c7b15 Mon Sep 17 00:00:00 2001 From: Tom Wilkie Date: Thu, 12 Jul 2018 19:33:40 +0100 Subject: [PATCH 3/6] Tidy up some of the logging. Signed-off-by: Tom Wilkie --- pkg/chunk/chunk_store.go | 1 - pkg/chunk/chunk_store_utils.go | 10 +++++++--- pkg/chunk/series_store.go | 12 ++++++------ 3 files changed, 13 insertions(+), 10 deletions(-) diff --git a/pkg/chunk/chunk_store.go b/pkg/chunk/chunk_store.go index db31bd1fb56..e5cf73a545b 100644 --- a/pkg/chunk/chunk_store.go +++ b/pkg/chunk/chunk_store.go @@ -455,7 +455,6 @@ func (c *store) parseIndexEntries(ctx context.Context, entries []IndexEntry, mat } if matcher != nil && !matcher.Matches(string(labelValue)) { - level.Debug(util.WithContext(ctx, util.Logger)).Log("msg", "dropping chunk for non-matching label", "label", labelValue) continue } result = append(result, chunkKey) diff --git a/pkg/chunk/chunk_store_utils.go b/pkg/chunk/chunk_store_utils.go index 2e54c5e3bdf..86694e6cd37 100644 --- a/pkg/chunk/chunk_store_utils.go +++ b/pkg/chunk/chunk_store_utils.go @@ -48,12 +48,16 @@ type spanLogger struct { ot.Span } -func newSpanLogger(ctx context.Context, method string) (*spanLogger, context.Context) { +func newSpanLogger(ctx context.Context, method string, kvps ...interface{}) (*spanLogger, context.Context) { span, ctx := ot.StartSpanFromContext(ctx, "ChunkStore.Get") - return &spanLogger{ + logger := &spanLogger{ Logger: log.With(util.WithContext(ctx, util.Logger), "method", method), Span: span, - }, ctx + } + if len(kvps) > 0 { + logger.Log(kvps...) + } + return logger, ctx } func (s *spanLogger) Log(kvps ...interface{}) error { diff --git a/pkg/chunk/series_store.go b/pkg/chunk/series_store.go index 55fbc00444d..48f0f477b31 100644 --- a/pkg/chunk/series_store.go +++ b/pkg/chunk/series_store.go @@ -53,7 +53,7 @@ func (c *seriesStore) Get(ctx context.Context, from, through model.Time, allMatc if !ok || metricNameMatcher.Type != labels.MatchEqual { return nil, fmt.Errorf("query must contain metric name") } - log.Span.SetTag("metric", metricNameMatcher.Value) + level.Debug(log).Log("metric", metricNameMatcher.Value) // Fetch the series IDs from the index, based on non-empty matchers from // the query. @@ -99,7 +99,7 @@ func (c *seriesStore) Get(ctx context.Context, from, through model.Time, allMatc } func (c *seriesStore) lookupSeriesByMetricNameMatchers(ctx context.Context, from, through model.Time, metricName string, matchers []*labels.Matcher) ([]string, error) { - log, ctx := newSpanLogger(ctx, "ChunkStore.lookupChunksByMetricName") + log, ctx := newSpanLogger(ctx, "ChunkStore.lookupSeriesByMetricNameMatchers", "metricName", metricName, "matchers", len(matchers)) log.Finish() // Just get series for metric if there are no matchers @@ -145,7 +145,7 @@ func (c *seriesStore) lookupSeriesByMetricNameMatchers(ctx context.Context, from } func (c *seriesStore) lookupSeriesByMetricNameMatcher(ctx context.Context, from, through model.Time, metricName string, matcher *labels.Matcher) ([]string, error) { - log, ctx := newSpanLogger(ctx, "ChunkStore.lookupChunksByMetricName") + log, ctx := newSpanLogger(ctx, "ChunkStore.lookupSeriesByMetricNameMatcher", "metricName", metricName, "matcher", matcher) defer log.Span.Finish() userID, err := user.ExtractOrgID(ctx) @@ -164,19 +164,19 @@ func (c *seriesStore) lookupSeriesByMetricNameMatcher(ctx context.Context, from, if err != nil { return nil, err } - level.Debug(log).Log("queries", len(queries), "matcher", matcher) + level.Debug(log).Log("queries", len(queries)) entries, err := c.lookupEntriesByQueries(ctx, queries) if err != nil { return nil, err } - level.Debug(log).Log("entries", len(entries), "matcher", matcher) + level.Debug(log).Log("entries", len(entries)) ids, err := c.parseIndexEntries(ctx, entries, matcher) if err != nil { return nil, err } - level.Debug(log).Log("ids", len(ids), "matcher", matcher) + level.Debug(log).Log("ids", len(ids)) return ids, nil } From 9810b169aa0dd0c23757cd02cd9d999798929968 Mon Sep 17 00:00:00 2001 From: Tom Wilkie Date: Fri, 13 Jul 2018 19:16:30 +0100 Subject: [PATCH 4/6] When a chunk span store boundaries, it should be written to both. Signed-off-by: Tom Wilkie --- pkg/chunk/composite_store.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/chunk/composite_store.go b/pkg/chunk/composite_store.go index 075ea6006a6..4e098564af8 100644 --- a/pkg/chunk/composite_store.go +++ b/pkg/chunk/composite_store.go @@ -118,7 +118,7 @@ func NewCompositeStore(cfg StoreConfig, schemaCfg SchemaConfig, storage StorageC func (c compositeStore) Put(ctx context.Context, chunks []Chunk) error { for _, chunk := range chunks { - err := c.forStores(chunk.From, chunk.From, func(_, _ model.Time, store Store) error { + err := c.forStores(chunk.From, chunk.Through, func(_, _ model.Time, store Store) error { return store.Put(ctx, []Chunk{chunk}) }) if err != nil { From e7aec97618f49de44a4c0295c5e0cb150c110f8e Mon Sep 17 00:00:00 2001 From: Tom Wilkie Date: Mon, 16 Jul 2018 12:17:41 +0100 Subject: [PATCH 5/6] Add heapCache, a cache that uses a heap for evictions. Signed-off-by: Tom Wilkie --- pkg/chunk/heap_cache.go | 97 ++++++++++++++++++++++++++++++++++++ pkg/chunk/heap_cache_test.go | 55 ++++++++++++++++++++ 2 files changed, 152 insertions(+) create mode 100644 pkg/chunk/heap_cache.go create mode 100644 pkg/chunk/heap_cache_test.go diff --git a/pkg/chunk/heap_cache.go b/pkg/chunk/heap_cache.go new file mode 100644 index 00000000000..86d8d4be01b --- /dev/null +++ b/pkg/chunk/heap_cache.go @@ -0,0 +1,97 @@ +package chunk + +import ( + "container/heap" + "sync" + "time" +) + +// heapCache is a simple string -> interface{} cache which uses a heap to manage +// evictions. O(log N) inserts, O(n) get and update. +type heapCache struct { + lock sync.RWMutex + size int + entries []cacheEntry + index map[string]int +} + +type cacheEntry struct { + updated time.Time + key string + value interface{} +} + +func newHeapCache(size int) *heapCache { + return &heapCache{ + size: size, + entries: make([]cacheEntry, 0, size), + index: make(map[string]int, size), + } +} + +func (c *heapCache) Len() int { + return len(c.entries) +} + +func (c *heapCache) Less(i, j int) bool { + return c.entries[i].updated.Before(c.entries[j].updated) +} + +func (c *heapCache) Swap(i, j int) { + c.entries[i], c.entries[j] = c.entries[j], c.entries[i] + c.index[c.entries[i].key] = i + c.index[c.entries[j].key] = j +} + +func (c *heapCache) Push(x interface{}) { + n := len(c.entries) + entry := x.(cacheEntry) + c.entries = append(c.entries, entry) + c.index[entry.key] = n +} + +func (c *heapCache) Pop() interface{} { + n := len(c.entries) + entry := c.entries[n-1] + c.entries = c.entries[0 : n-1] + delete(c.index, entry.key) + return entry +} + +func (c *heapCache) put(key string, value interface{}) { + c.lock.Lock() + defer c.lock.Unlock() + + // See if we already have the entry + index, ok := c.index[key] + if ok { + c.entries[index].updated = time.Now() + c.entries[index].value = value + heap.Fix(c, index) + return + } + + // Otherwise, see if we need to evict an entry (heap will update index). + if len(c.entries) >= c.size { + heap.Pop(c) + } + + // Put new entry on the heap (heap will update index). + heap.Push(c, cacheEntry{ + updated: time.Now(), + key: key, + value: value, + }) +} + +func (c *heapCache) get(key string) (value interface{}, ok bool) { + c.lock.RLock() + defer c.lock.RUnlock() + + var index int + index, ok = c.index[key] + if ok { + value = c.entries[index].value + } + return +} diff --git a/pkg/chunk/heap_cache_test.go b/pkg/chunk/heap_cache_test.go new file mode 100644 index 00000000000..568f7909910 --- /dev/null +++ b/pkg/chunk/heap_cache_test.go @@ -0,0 +1,55 @@ +package chunk + +import ( + "strconv" + "testing" + + "github.com/stretchr/testify/require" +) + +func TestHeapCache(t *testing.T) { + c := newHeapCache(10) + + // Check put / get works + for i := 0; i < 10; i++ { + c.put(strconv.Itoa(i), i) + } + require.Len(t, c.index, 10) + require.Len(t, c.entries, 10) + + for i := 0; i < 10; i++ { + value, ok := c.get(strconv.Itoa(i)) + require.True(t, ok) + require.Equal(t, i, value.(int)) + } + + // Check evictions + for i := 10; i < 15; i++ { + c.put(strconv.Itoa(i), i) + } + require.Len(t, c.index, 10) + require.Len(t, c.entries, 10) + + for i := 0; i < 5; i++ { + _, ok := c.get(strconv.Itoa(i)) + require.False(t, ok) + } + for i := 5; i < 15; i++ { + value, ok := c.get(strconv.Itoa(i)) + require.True(t, ok) + require.Equal(t, i, value.(int)) + } + + // Check updates work + for i := 5; i < 15; i++ { + c.put(strconv.Itoa(i), i*2) + } + require.Len(t, c.index, 10) + require.Len(t, c.entries, 10) + + for i := 5; i < 15; i++ { + value, ok := c.get(strconv.Itoa(i)) + require.True(t, ok) + require.Equal(t, i*2, value.(int)) + } +} From a8364c58e6839982641419f4fc51bdd9dbb13ada Mon Sep 17 00:00:00 2001 From: Tom Wilkie Date: Mon, 16 Jul 2018 12:33:44 +0100 Subject: [PATCH 6/6] Skip index queries for high cardinality labels. Signed-off-by: Tom Wilkie --- pkg/chunk/chunk_store.go | 10 ++++++-- pkg/chunk/heap_cache.go | 11 ++++++++- pkg/chunk/heap_cache_test.go | 8 +++---- pkg/chunk/series_store.go | 45 ++++++++++++++++++++++++++++++++---- 4 files changed, 62 insertions(+), 12 deletions(-) diff --git a/pkg/chunk/chunk_store.go b/pkg/chunk/chunk_store.go index e5cf73a545b..a78becd10d3 100644 --- a/pkg/chunk/chunk_store.go +++ b/pkg/chunk/chunk_store.go @@ -52,8 +52,11 @@ func init() { type StoreConfig struct { CacheConfig cache.Config - MinChunkAge time.Duration - QueryChunkLimit int + MinChunkAge time.Duration + QueryChunkLimit int + CardinalityCacheSize int + CardinalityCacheValidity time.Duration + CardinalityLimit int } // RegisterFlags adds the flags required to config this to the given FlagSet @@ -61,6 +64,9 @@ func (cfg *StoreConfig) RegisterFlags(f *flag.FlagSet) { cfg.CacheConfig.RegisterFlags(f) f.DurationVar(&cfg.MinChunkAge, "store.min-chunk-age", 0, "Minimum time between chunk update and being saved to the store.") f.IntVar(&cfg.QueryChunkLimit, "store.query-chunk-limit", 2e6, "Maximum number of chunks that can be fetched in a single query.") + f.IntVar(&cfg.CardinalityCacheSize, "store.cardinality-cache-size", 0, "Size of in-memory cardinality cache, 0 to disable.") + f.DurationVar(&cfg.CardinalityCacheValidity, "store.cardinality-cache-validity", 1*time.Hour, "Period for which entries in the cardinality cache are valid.") + f.IntVar(&cfg.CardinalityLimit, "store.cardinality-limit", 1e5, "Cardinality limit for index queries.") } // store implements Store diff --git a/pkg/chunk/heap_cache.go b/pkg/chunk/heap_cache.go index 86d8d4be01b..5f3e56b3409 100644 --- a/pkg/chunk/heap_cache.go +++ b/pkg/chunk/heap_cache.go @@ -59,6 +59,10 @@ func (c *heapCache) Pop() interface{} { } func (c *heapCache) put(key string, value interface{}) { + if c.size == 0 { + return + } + c.lock.Lock() defer c.lock.Unlock() @@ -84,7 +88,11 @@ func (c *heapCache) put(key string, value interface{}) { }) } -func (c *heapCache) get(key string) (value interface{}, ok bool) { +func (c *heapCache) get(key string) (value interface{}, updated time.Time, ok bool) { + if c.size == 0 { + return + } + c.lock.RLock() defer c.lock.RUnlock() @@ -92,6 +100,7 @@ func (c *heapCache) get(key string) (value interface{}, ok bool) { index, ok = c.index[key] if ok { value = c.entries[index].value + updated = c.entries[index].updated } return } diff --git a/pkg/chunk/heap_cache_test.go b/pkg/chunk/heap_cache_test.go index 568f7909910..885e59c5651 100644 --- a/pkg/chunk/heap_cache_test.go +++ b/pkg/chunk/heap_cache_test.go @@ -18,7 +18,7 @@ func TestHeapCache(t *testing.T) { require.Len(t, c.entries, 10) for i := 0; i < 10; i++ { - value, ok := c.get(strconv.Itoa(i)) + value, _, ok := c.get(strconv.Itoa(i)) require.True(t, ok) require.Equal(t, i, value.(int)) } @@ -31,11 +31,11 @@ func TestHeapCache(t *testing.T) { require.Len(t, c.entries, 10) for i := 0; i < 5; i++ { - _, ok := c.get(strconv.Itoa(i)) + _, _, ok := c.get(strconv.Itoa(i)) require.False(t, ok) } for i := 5; i < 15; i++ { - value, ok := c.get(strconv.Itoa(i)) + value, _, ok := c.get(strconv.Itoa(i)) require.True(t, ok) require.Equal(t, i, value.(int)) } @@ -48,7 +48,7 @@ func TestHeapCache(t *testing.T) { require.Len(t, c.entries, 10) for i := 5; i < 15; i++ { - value, ok := c.get(strconv.Itoa(i)) + value, _, ok := c.get(strconv.Itoa(i)) require.True(t, ok) require.Equal(t, i*2, value.(int)) } diff --git a/pkg/chunk/series_store.go b/pkg/chunk/series_store.go index 48f0f477b31..54b845740d3 100644 --- a/pkg/chunk/series_store.go +++ b/pkg/chunk/series_store.go @@ -2,7 +2,9 @@ package chunk import ( "context" + "errors" "fmt" + "time" "github.com/go-kit/kit/log/level" "github.com/prometheus/common/model" @@ -13,9 +15,14 @@ import ( "github.com/weaveworks/cortex/pkg/util/extract" ) +var ( + errCardinalityExceeded = errors.New("cardinality limit exceeded") +) + // seriesStore implements Store type seriesStore struct { store + cardinalityCache *heapCache } func newSeriesStore(cfg StoreConfig, schema Schema, storage StorageClient) (Store, error) { @@ -31,6 +38,7 @@ func newSeriesStore(cfg StoreConfig, schema Schema, storage StorageClient) (Stor schema: schema, chunkFetcher: fetcher, }, + cardinalityCache: newHeapCache(cfg.CardinalityCacheSize), }, nil } @@ -49,7 +57,7 @@ func (c *seriesStore) Get(ctx context.Context, from, through model.Time, allMatc } // Ensure this query includes a metric name. - metricNameMatcher, matchers, ok := extract.MetricNameMatcherFromMatchers(allMatchers) + metricNameMatcher, allMatchers, ok := extract.MetricNameMatcherFromMatchers(allMatchers) if !ok || metricNameMatcher.Type != labels.MatchEqual { return nil, fmt.Errorf("query must contain metric name") } @@ -57,7 +65,7 @@ func (c *seriesStore) Get(ctx context.Context, from, through model.Time, allMatc // Fetch the series IDs from the index, based on non-empty matchers from // the query. - filters, matchers := util.SplitFiltersAndMatchers(matchers) + _, matchers := util.SplitFiltersAndMatchers(allMatchers) seriesIDs, err := c.lookupSeriesByMetricNameMatchers(ctx, from, through, metricNameMatcher.Value, matchers) if err != nil { return nil, err @@ -94,7 +102,7 @@ func (c *seriesStore) Get(ctx context.Context, from, through model.Time, allMatc } // Filter out chunks based on the empty matchers in the query. - filteredChunks := filterChunksByMatchers(allChunks, filters) + filteredChunks := filterChunksByMatchers(allChunks, allMatchers) return filteredChunks, nil } @@ -124,6 +132,7 @@ func (c *seriesStore) lookupSeriesByMetricNameMatchers(ctx context.Context, from // Receive chunkSets from all matchers var ids []string var lastErr error + var cardinalityExceededErrors int for i := 0; i < len(matchers); i++ { select { case incoming := <-incomingIDs: @@ -133,10 +142,16 @@ func (c *seriesStore) lookupSeriesByMetricNameMatchers(ctx context.Context, from ids = intersectStrings(ids, incoming) } case err := <-incomingErrors: - lastErr = err + if err == errCardinalityExceeded { + cardinalityExceededErrors++ + } else { + lastErr = err + } } } - if lastErr != nil { + if cardinalityExceededErrors == len(matchers) { + return nil, errCardinalityExceeded + } else if lastErr != nil { return nil, lastErr } @@ -166,12 +181,32 @@ func (c *seriesStore) lookupSeriesByMetricNameMatcher(ctx context.Context, from, } level.Debug(log).Log("queries", len(queries)) + for _, query := range queries { + value, updated, ok := c.cardinalityCache.get(query.HashValue) + if !ok { + continue + } + entryAge := time.Now().Sub(updated) + cardinality := value.(int) + if entryAge < c.cfg.CardinalityCacheValidity && cardinality > c.cfg.CardinalityLimit { + return nil, errCardinalityExceeded + } + } + entries, err := c.lookupEntriesByQueries(ctx, queries) if err != nil { return nil, err } level.Debug(log).Log("entries", len(entries)) + // TODO This is not correct, will overcount for queries > 24hrs + for _, query := range queries { + c.cardinalityCache.put(query.HashValue, len(entries)) + } + if len(entries) > c.cfg.CardinalityLimit { + return nil, errCardinalityExceeded + } + ids, err := c.parseIndexEntries(ctx, entries, matcher) if err != nil { return nil, err