diff --git a/cmd/ingester/main.go b/cmd/ingester/main.go index 23b7a51960e..fc70dfbd1db 100644 --- a/cmd/ingester/main.go +++ b/cmd/ingester/main.go @@ -70,7 +70,7 @@ func main() { os.Exit(1) } - chunkStore, err := chunk.NewStore(chunkStoreConfig, schemaConfig, storageClient) + chunkStore, err := chunk.NewCompositeStore(chunkStoreConfig, schemaConfig, storageClient) if err != nil { level.Error(util.Logger).Log("err", err) os.Exit(1) diff --git a/cmd/lite/main.go b/cmd/lite/main.go index 096ee7f25d7..ec161e2c87a 100644 --- a/cmd/lite/main.go +++ b/cmd/lite/main.go @@ -78,7 +78,7 @@ func main() { os.Exit(1) } - chunkStore, err := chunk.NewStore(chunkStoreConfig, schemaConfig, storageClient) + chunkStore, err := chunk.NewCompositeStore(chunkStoreConfig, schemaConfig, storageClient) if err != nil { level.Error(util.Logger).Log("err", err) os.Exit(1) diff --git a/cmd/querier/main.go b/cmd/querier/main.go index 9429746a22b..e309107f664 100644 --- a/cmd/querier/main.go +++ b/cmd/querier/main.go @@ -80,7 +80,7 @@ func main() { os.Exit(1) } - chunkStore, err := chunk.NewStore(chunkStoreConfig, schemaConfig, storageClient) + chunkStore, err := chunk.NewCompositeStore(chunkStoreConfig, schemaConfig, storageClient) if err != nil { level.Error(util.Logger).Log("err", err) os.Exit(1) diff --git a/cmd/ruler/main.go b/cmd/ruler/main.go index f480e4c8c22..6649b10c5da 100644 --- a/cmd/ruler/main.go +++ b/cmd/ruler/main.go @@ -57,7 +57,7 @@ func main() { os.Exit(1) } - chunkStore, err := chunk.NewStore(chunkStoreConfig, schemaConfig, storageClient) + chunkStore, err := chunk.NewCompositeStore(chunkStoreConfig, schemaConfig, storageClient) if err != nil { level.Error(util.Logger).Log("err", err) os.Exit(1) diff --git a/pkg/chunk/chunk_store.go b/pkg/chunk/chunk_store.go index 24d3b1c1c20..a78becd10d3 100644 --- a/pkg/chunk/chunk_store.go +++ b/pkg/chunk/chunk_store.go @@ -7,10 +7,7 @@ import ( "sort" "time" - "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" - ot "github.com/opentracing/opentracing-go" - otlog "github.com/opentracing/opentracing-go/log" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/model" "github.com/prometheus/prometheus/pkg/labels" @@ -55,11 +52,11 @@ func init() { type StoreConfig struct { CacheConfig cache.Config - MinChunkAge time.Duration - QueryChunkLimit int - - // For injecting different schemas in tests. - schemaFactory func(cfg SchemaConfig) Schema + MinChunkAge time.Duration + QueryChunkLimit int + CardinalityCacheSize int + CardinalityCacheValidity time.Duration + CardinalityLimit int } // RegisterFlags adds the flags required to config this to the given FlagSet @@ -67,50 +64,41 @@ func (cfg *StoreConfig) RegisterFlags(f *flag.FlagSet) { cfg.CacheConfig.RegisterFlags(f) f.DurationVar(&cfg.MinChunkAge, "store.min-chunk-age", 0, "Minimum time between chunk update and being saved to the store.") f.IntVar(&cfg.QueryChunkLimit, "store.query-chunk-limit", 2e6, "Maximum number of chunks that can be fetched in a single query.") + f.IntVar(&cfg.CardinalityCacheSize, "store.cardinality-cache-size", 0, "Size of in-memory cardinality cache, 0 to disable.") + f.DurationVar(&cfg.CardinalityCacheValidity, "store.cardinality-cache-validity", 1*time.Hour, "Period for which entries in the cardinality cache are valid.") + f.IntVar(&cfg.CardinalityLimit, "store.cardinality-limit", 1e5, "Cardinality limit for index queries.") } -// Store implements Store -type Store struct { +// store implements Store +type store struct { cfg StoreConfig storage StorageClient - cache cache.Cache schema Schema + *chunkFetcher } -// NewStore makes a new ChunkStore -func NewStore(cfg StoreConfig, schemaCfg SchemaConfig, storage StorageClient) (*Store, error) { - var schema Schema - var err error - if cfg.schemaFactory == nil { - schema, err = newCompositeSchema(schemaCfg) - } else { - schema = cfg.schemaFactory(schemaCfg) - } - if err != nil { - return nil, err - } - - cache, err := cache.New(cfg.CacheConfig) +func newStore(cfg StoreConfig, schema Schema, storage StorageClient) (Store, error) { + fetcher, err := newChunkFetcher(cfg.CacheConfig, storage) if err != nil { return nil, err } - return &Store{ - cfg: cfg, - storage: storage, - schema: schema, - cache: cache, + return &store{ + cfg: cfg, + storage: storage, + schema: schema, + chunkFetcher: fetcher, }, nil } // Stop any background goroutines (ie in the cache.) -func (c *Store) Stop() { +func (c *store) Stop() { c.cache.Stop() } // Put implements ChunkStore -func (c *Store) Put(ctx context.Context, chunks []Chunk) error { +func (c *store) Put(ctx context.Context, chunks []Chunk) error { userID, err := user.ExtractOrgID(ctx) if err != nil { return err @@ -125,8 +113,8 @@ func (c *Store) Put(ctx context.Context, chunks []Chunk) error { return c.updateIndex(ctx, userID, chunks) } -func (c *Store) updateIndex(ctx context.Context, userID string, chunks []Chunk) error { - writeReqs, err := c.calculateDynamoWrites(userID, chunks) +func (c *store) updateIndex(ctx context.Context, userID string, chunks []Chunk) error { + writeReqs, err := c.calculateIndexEntries(userID, chunks) if err != nil { return err } @@ -134,9 +122,8 @@ func (c *Store) updateIndex(ctx context.Context, userID string, chunks []Chunk) return c.storage.BatchWrite(ctx, writeReqs) } -// calculateDynamoWrites creates a set of batched WriteRequests to dynamo for all -// the chunks it is given. -func (c *Store) calculateDynamoWrites(userID string, chunks []Chunk) (WriteBatch, error) { +// calculateIndexEntries creates a set of batched WriteRequests for all the chunks it is given. +func (c *store) calculateIndexEntries(userID string, chunks []Chunk) (WriteBatch, error) { seenIndexEntries := map[string]struct{}{} writeReqs := c.storage.NewWriteBatch() @@ -153,88 +140,77 @@ func (c *Store) calculateDynamoWrites(userID string, chunks []Chunk) (WriteBatch indexEntriesPerChunk.Observe(float64(len(entries))) // Remove duplicate entries based on tableName:hashValue:rangeValue - unseenEntries := []IndexEntry{} for _, entry := range entries { key := fmt.Sprintf("%s:%s:%x", entry.TableName, entry.HashValue, entry.RangeValue) if _, ok := seenIndexEntries[key]; !ok { seenIndexEntries[key] = struct{}{} - unseenEntries = append(unseenEntries, entry) + rowWrites.Observe(entry.HashValue, 1) + writeReqs.Add(entry.TableName, entry.HashValue, entry.RangeValue, entry.Value) } } - - for _, entry := range unseenEntries { - rowWrites.Observe(entry.HashValue, 1) - writeReqs.Add(entry.TableName, entry.HashValue, entry.RangeValue, entry.Value) - } } return writeReqs, nil } -// spanLogger unifies tracing and logging, to reduce repetition. -type spanLogger struct { - log.Logger - ot.Span -} - -func newSpanLogger(ctx context.Context, method string) (*spanLogger, context.Context) { - span, ctx := ot.StartSpanFromContext(ctx, "ChunkStore.Get") - return &spanLogger{ - Logger: log.With(util.WithContext(ctx, util.Logger), "method", method), - Span: span, - }, ctx -} +// Get implements Store +func (c *store) Get(ctx context.Context, from, through model.Time, allMatchers ...*labels.Matcher) ([]Chunk, error) { + log, ctx := newSpanLogger(ctx, "ChunkStore.Get") + defer log.Span.Finish() + level.Debug(log).Log("from", from, "through", through, "matchers", len(allMatchers)) -func (s *spanLogger) Log(kvps ...interface{}) error { - s.Logger.Log(kvps...) - fields, err := otlog.InterleavedKVToFields(kvps...) + // Validate the query is within reasonable bounds. + shortcut, err := c.validateQuery(ctx, from, &through) if err != nil { - return err + return nil, err + } else if shortcut { + return nil, nil } - s.Span.LogFields(fields...) - return nil + + // Fetch metric name chunks if the matcher is of type equal, + metricNameMatcher, matchers, ok := extract.MetricNameMatcherFromMatchers(allMatchers) + if ok && metricNameMatcher.Type == labels.MatchEqual { + log.Span.SetTag("metric", metricNameMatcher.Value) + return c.getMetricNameChunks(ctx, from, through, matchers, metricNameMatcher.Value) + } + + // Otherwise we consult the metric name index first and then create queries for each matching metric name. + return c.getSeriesChunks(ctx, from, through, matchers, metricNameMatcher) } -// Get implements ChunkStore -func (c *Store) Get(ctx context.Context, from, through model.Time, allMatchers ...*labels.Matcher) ([]Chunk, error) { - log, ctx := newSpanLogger(ctx, "ChunkStore.Get") +func (c *store) validateQuery(ctx context.Context, from model.Time, through *model.Time) (shortcut bool, err error) { + log, ctx := newSpanLogger(ctx, "store.validateQuery") defer log.Span.Finish() now := model.Now() - level.Debug(log).Log("from", from, "through", through, "now", now, "matchers", len(allMatchers)) - if through < from { - return nil, fmt.Errorf("invalid query, through < from (%d < %d)", through, from) + if *through < from { + err = fmt.Errorf("invalid query, through < from (%d < %d)", through, from) + return } if from.After(now) { // time-span start is in future ... regard as legal level.Error(log).Log("msg", "whole timerange in future, yield empty resultset", "through", through, "from", from, "now", now) - return nil, nil + shortcut = true + return } if from.After(now.Add(-c.cfg.MinChunkAge)) { // no data relevant to this query will have arrived at the store yet - return nil, nil + shortcut = true + return } if through.After(now.Add(5 * time.Minute)) { // time-span end is in future ... regard as legal level.Error(log).Log("msg", "adjusting end timerange from future to now", "old_through", through, "new_through", now) - through = now // Avoid processing future part - otherwise some schemas could fail with eg non-existent table gripes + *through = now // Avoid processing future part - otherwise some schemas could fail with eg non-existent table gripes } - // Fetch metric name chunks if the matcher is of type equal, - metricNameMatcher, matchers, ok := extract.MetricNameMatcherFromMatchers(allMatchers) - if ok && metricNameMatcher.Type == labels.MatchEqual { - log.Span.SetTag("metric", metricNameMatcher.Value) - return c.getMetricNameChunks(ctx, from, through, matchers, metricNameMatcher.Value) - } - - // Otherwise we consult the metric name index first and then create queries for each matching metric name. - return c.getSeriesChunks(ctx, from, through, matchers, metricNameMatcher) + return } -func (c *Store) getMetricNameChunks(ctx context.Context, from, through model.Time, allMatchers []*labels.Matcher, metricName string) ([]Chunk, error) { +func (c *store) getMetricNameChunks(ctx context.Context, from, through model.Time, allMatchers []*labels.Matcher, metricName string) ([]Chunk, error) { log, ctx := newSpanLogger(ctx, "ChunkStore.getMetricNameChunks") defer log.Finish() level.Debug(log).Log("from", from, "through", through, "metricName", metricName, "matchers", len(allMatchers)) @@ -247,15 +223,7 @@ func (c *Store) getMetricNameChunks(ctx context.Context, from, through model.Tim level.Debug(log).Log("Chunks in index", len(chunks)) // Filter out chunks that are not in the selected time range. - filtered := make([]Chunk, 0, len(chunks)) - keys := make([]string, 0, len(chunks)) - for _, chunk := range chunks { - if chunk.Through < from || through < chunk.From { - continue - } - filtered = append(filtered, chunk) - keys = append(keys, chunk.ExternalKey()) - } + filtered, keys := filterChunksByTime(from, through, chunks) level.Debug(log).Log("Chunks post filtering", len(chunks)) if len(filtered) > c.cfg.QueryChunkLimit { @@ -265,80 +233,17 @@ func (c *Store) getMetricNameChunks(ctx context.Context, from, through model.Tim } // Now fetch the actual chunk data from Memcache / S3 - cacheHits, cacheBufs, _, err := c.cache.FetchChunkData(ctx, keys) - if err != nil { - level.Warn(log).Log("msg", "error fetching from cache", "err", err) - } - - fromCache, missing, err := ProcessCacheResponse(filtered, cacheHits, cacheBufs) - if err != nil { - level.Warn(log).Log("msg", "error fetching from cache", "err", err) - } - - fromStorage, err := c.storage.GetChunks(ctx, missing) - - // Always cache any chunks we did get - if cacheErr := c.writeBackCache(ctx, fromStorage); cacheErr != nil { - level.Warn(log).Log("msg", "could not store chunks in chunk cache", "err", cacheErr) - } - + allChunks, err := c.fetchChunks(ctx, filtered, keys) if err != nil { return nil, promql.ErrStorage(err) } - allChunks := append(fromCache, fromStorage...) - - // Filter out chunks - filteredChunks := make([]Chunk, 0, len(allChunks)) -outer: - for _, chunk := range allChunks { - for _, filter := range filters { - if !filter.Matches(string(chunk.Metric[model.LabelName(filter.Name)])) { - continue outer - } - } - filteredChunks = append(filteredChunks, chunk) - } - + // Filter out chunks based on the empty matchers in the query. + filteredChunks := filterChunksByMatchers(allChunks, filters) return filteredChunks, nil } -// ProcessCacheResponse decodes the chunks coming back from the cache, separating -// hits and misses. -func ProcessCacheResponse(chunks []Chunk, keys []string, bufs [][]byte) (found []Chunk, missing []Chunk, err error) { - decodeContext := NewDecodeContext() - - i, j := 0, 0 - for i < len(chunks) && j < len(keys) { - chunkKey := chunks[i].ExternalKey() - - if chunkKey < keys[j] { - missing = append(missing, chunks[i]) - i++ - } else if chunkKey > keys[j] { - level.Debug(util.Logger).Log("msg", "got chunk from cache we didn't ask for") - j++ - } else { - chunk := chunks[i] - err = chunk.Decode(decodeContext, bufs[j]) - if err != nil { - cacheCorrupt.Inc() - return - } - found = append(found, chunk) - i++ - j++ - } - } - - for ; i < len(chunks); i++ { - missing = append(missing, chunks[i]) - } - - return -} - -func (c *Store) getSeriesChunks(ctx context.Context, from, through model.Time, allMatchers []*labels.Matcher, metricNameMatcher *labels.Matcher) ([]Chunk, error) { +func (c *store) getSeriesChunks(ctx context.Context, from, through model.Time, allMatchers []*labels.Matcher, metricNameMatcher *labels.Matcher) ([]Chunk, error) { // Get all series from the index userID, err := user.ExtractOrgID(ctx) if err != nil { @@ -402,7 +307,7 @@ outer: return chunks, nil } -func (c *Store) lookupChunksByMetricName(ctx context.Context, from, through model.Time, matchers []*labels.Matcher, metricName string) ([]Chunk, error) { +func (c *store) lookupChunksByMetricName(ctx context.Context, from, through model.Time, matchers []*labels.Matcher, metricName string) ([]Chunk, error) { log, ctx := newSpanLogger(ctx, "ChunkStore.lookupChunksByMetricName") defer log.Finish() @@ -490,14 +395,13 @@ func (c *Store) lookupChunksByMetricName(ctx context.Context, from, through mode if lastErr != nil { return nil, lastErr } - - level.Debug(log).Log("msg", "post intersection", "entries", len(chunkIDs)) + level.Debug(log).Log("msg", "post intersection", "chunkIDs", len(chunkIDs)) // Convert IndexEntry's into chunks return c.convertChunkIDsToChunks(ctx, chunkIDs) } -func (c *Store) lookupEntriesByQueries(ctx context.Context, queries []IndexQuery) ([]IndexEntry, error) { +func (c *store) lookupEntriesByQueries(ctx context.Context, queries []IndexQuery) ([]IndexEntry, error) { incomingEntries := make(chan []IndexEntry) incomingErrors := make(chan error) for _, query := range queries { @@ -526,7 +430,7 @@ func (c *Store) lookupEntriesByQueries(ctx context.Context, queries []IndexQuery return entries, lastErr } -func (c *Store) lookupEntriesByQuery(ctx context.Context, query IndexQuery) ([]IndexEntry, error) { +func (c *store) lookupEntriesByQuery(ctx context.Context, query IndexQuery) ([]IndexEntry, error) { var entries []IndexEntry if err := c.storage.QueryPages(ctx, query, func(resp ReadBatch) (shouldContinue bool) { @@ -547,17 +451,16 @@ func (c *Store) lookupEntriesByQuery(ctx context.Context, query IndexQuery) ([]I return entries, nil } -func (c *Store) parseIndexEntries(ctx context.Context, entries []IndexEntry, matcher *labels.Matcher) ([]string, error) { +func (c *store) parseIndexEntries(ctx context.Context, entries []IndexEntry, matcher *labels.Matcher) ([]string, error) { result := make([]string, 0, len(entries)) for _, entry := range entries { - chunkKey, labelValue, _, err := parseChunkTimeRangeValue(entry.RangeValue, entry.Value) + chunkKey, labelValue, _, _, err := parseChunkTimeRangeValue(entry.RangeValue, entry.Value) if err != nil { return nil, err } if matcher != nil && !matcher.Matches(string(labelValue)) { - level.Debug(util.WithContext(ctx, util.Logger)).Log("msg", "dropping chunk for non-matching label", "label", labelValue) continue } result = append(result, chunkKey) @@ -569,7 +472,7 @@ func (c *Store) parseIndexEntries(ctx context.Context, entries []IndexEntry, mat return result, nil } -func (c *Store) convertChunkIDsToChunks(ctx context.Context, chunkIDs []string) ([]Chunk, error) { +func (c *store) convertChunkIDsToChunks(ctx context.Context, chunkIDs []string) ([]Chunk, error) { userID, err := user.ExtractOrgID(ctx) if err != nil { return nil, err @@ -586,16 +489,3 @@ func (c *Store) convertChunkIDsToChunks(ctx context.Context, chunkIDs []string) return chunkSet, nil } - -func (c *Store) writeBackCache(ctx context.Context, chunks []Chunk) error { - for i := range chunks { - encoded, err := chunks[i].Encode() - if err != nil { - return err - } - if err := c.cache.StoreChunk(ctx, chunks[i].ExternalKey(), encoded); err != nil { - return err - } - } - return nil -} diff --git a/pkg/chunk/chunk_store_test.go b/pkg/chunk/chunk_store_test.go index 9bf523de0b3..dc58ae412dd 100644 --- a/pkg/chunk/chunk_store_test.go +++ b/pkg/chunk/chunk_store_test.go @@ -14,6 +14,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/weaveworks/cortex/pkg/prom1/storage/local/chunk" + "github.com/weaveworks/cortex/pkg/util" "github.com/weaveworks/cortex/pkg/util/extract" "golang.org/x/net/context" @@ -21,15 +22,42 @@ import ( "github.com/weaveworks/common/user" ) +type schemaFactory func(cfg SchemaConfig) Schema +type storeFactory func(StoreConfig, Schema, StorageClient) (Store, error) + +var schemas = []struct { + name string + schemaFn schemaFactory + storeFn storeFactory + requireMetricName bool +}{ + {"v1 schema", v1Schema, newStore, true}, + {"v2 schema", v2Schema, newStore, true}, + {"v3 schema", v3Schema, newStore, true}, + {"v4 schema", v4Schema, newStore, true}, + {"v5 schema", v5Schema, newStore, true}, + {"v6 schema", v6Schema, newStore, true}, + {"v7 schema", v7Schema, newStore, true}, + {"v8 schema", v8Schema, newStore, false}, + {"v9 schema", v9Schema, newSeriesStore, true}, +} + // newTestStore creates a new Store for testing. -func newTestChunkStore(t *testing.T, cfg StoreConfig) *Store { +func newTestChunkStore(t *testing.T, schemaFactory schemaFactory, storeFactory storeFactory) Store { + var ( + storeCfg StoreConfig + schemaCfg SchemaConfig + ) + util.DefaultValues(&storeCfg, &schemaCfg) + storage := NewMockStorage() - schemaCfg := SchemaConfig{} tableManager, err := NewTableManager(schemaCfg, maxChunkAge, storage) require.NoError(t, err) + err = tableManager.SyncTables(context.Background()) require.NoError(t, err) - store, err := NewStore(cfg, schemaCfg, storage) + + store, err := storeFactory(storeCfg, schemaFactory(schemaCfg), storage) require.NoError(t, err) return store } @@ -102,21 +130,6 @@ func TestChunkStore_Get(t *testing.T) { barSampleStream2, err := createSampleStreamFrom(barChunk2) require.NoError(t, err) - schemas := []struct { - name string - fn func(cfg SchemaConfig) Schema - requireMetricName bool - }{ - {"v1 schema", v1Schema, true}, - {"v2 schema", v2Schema, true}, - {"v3 schema", v3Schema, true}, - {"v4 schema", v4Schema, true}, - {"v5 schema", v5Schema, true}, - {"v6 schema", v6Schema, true}, - {"v7 schema", v7Schema, true}, - {"v8 schema", v8Schema, false}, - } - for _, tc := range []struct { query string expect model.Matrix @@ -193,10 +206,7 @@ func TestChunkStore_Get(t *testing.T) { for _, schema := range schemas { t.Run(fmt.Sprintf("%s / %s", tc.query, schema.name), func(t *testing.T) { t.Log("========= Running query", tc.query, "with schema", schema.name) - store := newTestChunkStore(t, StoreConfig{ - schemaFactory: schema.fn, - QueryChunkLimit: 2e6, - }) + store := newTestChunkStore(t, schema.schemaFn, schema.storeFn) if err := store.Put(ctx, []Chunk{ fooChunk1, @@ -226,8 +236,6 @@ func TestChunkStore_Get(t *testing.T) { sort.Sort(ByFingerprint(matrix1)) if !reflect.DeepEqual(tc.expect, matrix1) { - t.Fatalf("jml\nstart = %#v\nnow = %#v\nfooChunk1 = %#v\nfooChunk2 = %#v\nbarChunk1 = %#v\nbarChunk2 = %#v\n", - now.Add(-time.Hour), now, fooChunk1, fooChunk2, barChunk1, barChunk2) t.Fatalf("%s: wrong chunks - %s", tc.query, test.Diff(tc.expect, matrix1)) } @@ -259,7 +267,6 @@ func TestChunkStore_Get(t *testing.T) { func TestChunkStore_getMetricNameChunks(t *testing.T) { ctx := user.InjectOrgID(context.Background(), userID) now := model.Now() - metricName := "foo" chunk1 := dummyChunkFor(now, model.Metric{ model.MetricNameLabel: "foo", "bar": "baz", @@ -272,83 +279,62 @@ func TestChunkStore_getMetricNameChunks(t *testing.T) { "toms": "code", }) - schemas := []struct { - name string - fn func(cfg SchemaConfig) Schema - }{ - {"v1 schema", v1Schema}, - {"v2 schema", v2Schema}, - {"v3 schema", v3Schema}, - {"v4 schema", v4Schema}, - {"v5 schema", v5Schema}, - {"v6 schema", v6Schema}, - {"v7 schema", v7Schema}, - {"v8 schema", v8Schema}, - } - for _, tc := range []struct { - query string - expect []Chunk - matchers []*labels.Matcher + query string + expect []Chunk }{ { `foo`, []Chunk{chunk1, chunk2}, - []*labels.Matcher{}, }, { `foo{flip=""}`, []Chunk{chunk2}, - []*labels.Matcher{mustNewLabelMatcher(labels.MatchEqual, "flip", "")}, }, { `foo{bar="baz"}`, []Chunk{chunk1}, - []*labels.Matcher{mustNewLabelMatcher(labels.MatchEqual, "bar", "baz")}, }, { `foo{bar="beep"}`, []Chunk{chunk2}, - []*labels.Matcher{mustNewLabelMatcher(labels.MatchEqual, "bar", "beep")}, }, { `foo{toms="code"}`, []Chunk{chunk1, chunk2}, - []*labels.Matcher{mustNewLabelMatcher(labels.MatchEqual, "toms", "code")}, }, { `foo{bar!="baz"}`, []Chunk{chunk2}, - []*labels.Matcher{mustNewLabelMatcher(labels.MatchNotEqual, "bar", "baz")}, }, { `foo{bar=~"beep|baz"}`, []Chunk{chunk1, chunk2}, - []*labels.Matcher{mustNewLabelMatcher(labels.MatchRegexp, "bar", "beep|baz")}, }, { `foo{toms="code", bar=~"beep|baz"}`, []Chunk{chunk1, chunk2}, - []*labels.Matcher{mustNewLabelMatcher(labels.MatchEqual, "toms", "code"), mustNewLabelMatcher(labels.MatchRegexp, "bar", "beep|baz")}, }, { `foo{toms="code", bar="baz"}`, - []Chunk{chunk1}, []*labels.Matcher{mustNewLabelMatcher(labels.MatchEqual, "toms", "code"), mustNewLabelMatcher(labels.MatchEqual, "bar", "baz")}, + []Chunk{chunk1}, }, } { for _, schema := range schemas { t.Run(fmt.Sprintf("%s / %s", tc.query, schema.name), func(t *testing.T) { t.Log("========= Running query", tc.query, "with schema", schema.name) - store := newTestChunkStore(t, StoreConfig{ - schemaFactory: schema.fn, - QueryChunkLimit: 2e6, - }) + store := newTestChunkStore(t, schema.schemaFn, schema.storeFn) if err := store.Put(ctx, []Chunk{chunk1, chunk2}); err != nil { t.Fatal(err) } - chunks, err := store.getMetricNameChunks(ctx, now.Add(-time.Hour), now, tc.matchers, metricName) + matchers, err := promql.ParseMetricSelector(tc.query) + if err != nil { + t.Fatal(err) + } + + chunks, err := store.Get(ctx, now.Add(-time.Hour), now, matchers...) require.NoError(t, err) if !reflect.DeepEqual(tc.expect, chunks) { @@ -369,96 +355,73 @@ func mustNewLabelMatcher(matchType labels.MatchType, name string, value string) func TestChunkStoreRandom(t *testing.T) { ctx := user.InjectOrgID(context.Background(), userID) - schemas := []struct { - name string - fn func(cfg SchemaConfig) Schema - store *Store - }{ - {name: "v1 schema", fn: v1Schema}, - {name: "v2 schema", fn: v2Schema}, - {name: "v3 schema", fn: v3Schema}, - {name: "v4 schema", fn: v4Schema}, - {name: "v5 schema", fn: v5Schema}, - {name: "v6 schema", fn: v6Schema}, - {name: "v7 schema", fn: v7Schema}, - {name: "v8 schema", fn: v8Schema}, - } - for i := range schemas { - schemas[i].store = newTestChunkStore(t, StoreConfig{ - schemaFactory: schemas[i].fn, - QueryChunkLimit: 2e6, - }) - } + for _, schema := range schemas { + t.Run(schema.name, func(t *testing.T) { + store := newTestChunkStore(t, schema.schemaFn, schema.storeFn) + + // put 100 chunks from 0 to 99 + const chunkLen = 13 * 3600 // in seconds + for i := 0; i < 100; i++ { + ts := model.TimeFromUnix(int64(i * chunkLen)) + chunks, _ := chunk.New().Add(model.SamplePair{ + Timestamp: ts, + Value: model.SampleValue(float64(i)), + }) + chunk := NewChunk( + userID, + model.Fingerprint(1), + model.Metric{ + model.MetricNameLabel: "foo", + "bar": "baz", + }, + chunks[0], + ts, + ts.Add(chunkLen*time.Second), + ) + + err := store.Put(ctx, []Chunk{chunk}) + require.NoError(t, err) + } - // put 100 chunks from 0 to 99 - const chunkLen = 13 * 3600 // in seconds - for i := 0; i < 100; i++ { - ts := model.TimeFromUnix(int64(i * chunkLen)) - chunks, _ := chunk.New().Add(model.SamplePair{ - Timestamp: ts, - Value: model.SampleValue(float64(i)), - }) - chunk := NewChunk( - userID, - model.Fingerprint(1), - model.Metric{ - model.MetricNameLabel: "foo", - "bar": "baz", - }, - chunks[0], - ts, - ts.Add(chunkLen*time.Second), - ) - for _, s := range schemas { - err := s.store.Put(ctx, []Chunk{chunk}) - require.NoError(t, err) - } - } + // pick two random numbers and do a query + for i := 0; i < 100; i++ { + start := rand.Int63n(100 * chunkLen) + end := start + rand.Int63n((100*chunkLen)-start) + assert.True(t, start < end) - // pick two random numbers and do a query - for i := 0; i < 100; i++ { - start := rand.Int63n(100 * chunkLen) - end := start + rand.Int63n((100*chunkLen)-start) - assert.True(t, start < end) + startTime := model.TimeFromUnix(start) + endTime := model.TimeFromUnix(end) - startTime := model.TimeFromUnix(start) - endTime := model.TimeFromUnix(end) + matchers := []*labels.Matcher{ + mustNewLabelMatcher(labels.MatchEqual, model.MetricNameLabel, "foo"), + mustNewLabelMatcher(labels.MatchEqual, "bar", "baz"), + } + chunks, err := store.Get(ctx, startTime, endTime, matchers...) + require.NoError(t, err) - metricNameLabel := mustNewLabelMatcher(labels.MatchEqual, model.MetricNameLabel, "foo") - matchers := []*labels.Matcher{mustNewLabelMatcher(labels.MatchEqual, "bar", "baz")} - - for _, s := range schemas { - chunks, err := s.store.getMetricNameChunks(ctx, startTime, endTime, - matchers, - metricNameLabel.Value, - ) - require.NoError(t, err) - - // We need to check that each chunk is in the time range - for _, chunk := range chunks { - assert.False(t, chunk.From.After(endTime)) - assert.False(t, chunk.Through.Before(startTime)) - samples, err := chunk.Samples(chunk.From, chunk.Through) - assert.NoError(t, err) - assert.Equal(t, 1, len(samples)) - // TODO verify chunk contents - } + // We need to check that each chunk is in the time range + for _, chunk := range chunks { + assert.False(t, chunk.From.After(endTime)) + assert.False(t, chunk.Through.Before(startTime)) + samples, err := chunk.Samples(chunk.From, chunk.Through) + assert.NoError(t, err) + assert.Equal(t, 1, len(samples)) + // TODO verify chunk contents + } - // And check we got all the chunks we want - numChunks := (end / chunkLen) - (start / chunkLen) + 1 - assert.Equal(t, int(numChunks), len(chunks), s.name) - } + // And check we got all the chunks we want + numChunks := (end / chunkLen) - (start / chunkLen) + 1 + assert.Equal(t, int(numChunks), len(chunks)) + } + }) } } func TestChunkStoreLeastRead(t *testing.T) { // Test we don't read too much from the index ctx := user.InjectOrgID(context.Background(), userID) - store := newTestChunkStore(t, StoreConfig{ - schemaFactory: v6Schema, - QueryChunkLimit: 2e6, - }) + store := newTestChunkStore(t, v6Schema, newStore) // Put 24 chunks 1hr chunks in the store const chunkLen = 60 // in seconds @@ -492,14 +455,12 @@ func TestChunkStoreLeastRead(t *testing.T) { startTime := model.TimeFromUnix(start) endTime := model.TimeFromUnix(end) + matchers := []*labels.Matcher{ + mustNewLabelMatcher(labels.MatchEqual, model.MetricNameLabel, "foo"), + mustNewLabelMatcher(labels.MatchEqual, "bar", "baz"), + } - metricNameLabel := mustNewLabelMatcher(labels.MatchEqual, model.MetricNameLabel, "foo") - matchers := []*labels.Matcher{mustNewLabelMatcher(labels.MatchEqual, "bar", "baz")} - - chunks, err := store.getMetricNameChunks(ctx, startTime, endTime, - matchers, - metricNameLabel.Value, - ) + chunks, err := store.Get(ctx, startTime, endTime, matchers...) if err != nil { t.Fatal(t, err) } diff --git a/pkg/chunk/chunk_store_utils.go b/pkg/chunk/chunk_store_utils.go new file mode 100644 index 00000000000..86694e6cd37 --- /dev/null +++ b/pkg/chunk/chunk_store_utils.go @@ -0,0 +1,172 @@ +package chunk + +import ( + "context" + + "github.com/go-kit/kit/log" + "github.com/go-kit/kit/log/level" + ot "github.com/opentracing/opentracing-go" + otlog "github.com/opentracing/opentracing-go/log" + "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/pkg/labels" + "github.com/prometheus/prometheus/promql" + + "github.com/weaveworks/cortex/pkg/chunk/cache" + "github.com/weaveworks/cortex/pkg/util" +) + +func filterChunksByTime(from, through model.Time, chunks []Chunk) ([]Chunk, []string) { + filtered := make([]Chunk, 0, len(chunks)) + keys := make([]string, 0, len(chunks)) + for _, chunk := range chunks { + if chunk.Through < from || through < chunk.From { + continue + } + filtered = append(filtered, chunk) + keys = append(keys, chunk.ExternalKey()) + } + return filtered, keys +} + +func filterChunksByMatchers(chunks []Chunk, filters []*labels.Matcher) []Chunk { + filteredChunks := make([]Chunk, 0, len(chunks)) +outer: + for _, chunk := range chunks { + for _, filter := range filters { + if !filter.Matches(string(chunk.Metric[model.LabelName(filter.Name)])) { + continue outer + } + } + filteredChunks = append(filteredChunks, chunk) + } + return filteredChunks +} + +// spanLogger unifies tracing and logging, to reduce repetition. +type spanLogger struct { + log.Logger + ot.Span +} + +func newSpanLogger(ctx context.Context, method string, kvps ...interface{}) (*spanLogger, context.Context) { + span, ctx := ot.StartSpanFromContext(ctx, "ChunkStore.Get") + logger := &spanLogger{ + Logger: log.With(util.WithContext(ctx, util.Logger), "method", method), + Span: span, + } + if len(kvps) > 0 { + logger.Log(kvps...) + } + return logger, ctx +} + +func (s *spanLogger) Log(kvps ...interface{}) error { + s.Logger.Log(kvps...) + fields, err := otlog.InterleavedKVToFields(kvps...) + if err != nil { + return err + } + s.Span.LogFields(fields...) + return nil +} + +// chunkFetcher deals with fetching chunk contents from the cache/store, +// and writing back any misses to the cache. +type chunkFetcher struct { + storage StorageClient + cache cache.Cache +} + +func newChunkFetcher(cfg cache.Config, storage StorageClient) (*chunkFetcher, error) { + cache, err := cache.New(cfg) + if err != nil { + return nil, err + } + + return &chunkFetcher{ + storage: storage, + cache: cache, + }, nil +} + +func (c *chunkFetcher) Stop() { + c.cache.Stop() +} + +func (c *chunkFetcher) fetchChunks(ctx context.Context, chunks []Chunk, keys []string) ([]Chunk, error) { + log, ctx := newSpanLogger(ctx, "ChunkStore.getMetricNameChunks") + defer log.Span.Finish() + + // Now fetch the actual chunk data from Memcache / S3 + cacheHits, cacheBufs, _, err := c.cache.FetchChunkData(ctx, keys) + if err != nil { + level.Warn(log).Log("msg", "error fetching from cache", "err", err) + } + + fromCache, missing, err := ProcessCacheResponse(chunks, cacheHits, cacheBufs) + if err != nil { + level.Warn(log).Log("msg", "error fetching from cache", "err", err) + } + + fromStorage, err := c.storage.GetChunks(ctx, missing) + + // Always cache any chunks we did get + if cacheErr := c.writeBackCache(ctx, fromStorage); cacheErr != nil { + level.Warn(log).Log("msg", "could not store chunks in chunk cache", "err", cacheErr) + } + + if err != nil { + return nil, promql.ErrStorage(err) + } + + allChunks := append(fromCache, fromStorage...) + return allChunks, nil +} + +func (c *chunkFetcher) writeBackCache(ctx context.Context, chunks []Chunk) error { + for i := range chunks { + encoded, err := chunks[i].Encode() + if err != nil { + return err + } + if err := c.cache.StoreChunk(ctx, chunks[i].ExternalKey(), encoded); err != nil { + return err + } + } + return nil +} + +// ProcessCacheResponse decodes the chunks coming back from the cache, separating +// hits and misses. +func ProcessCacheResponse(chunks []Chunk, keys []string, bufs [][]byte) (found []Chunk, missing []Chunk, err error) { + decodeContext := NewDecodeContext() + + i, j := 0, 0 + for i < len(chunks) && j < len(keys) { + chunkKey := chunks[i].ExternalKey() + + if chunkKey < keys[j] { + missing = append(missing, chunks[i]) + i++ + } else if chunkKey > keys[j] { + level.Debug(util.Logger).Log("msg", "got chunk from cache we didn't ask for") + j++ + } else { + chunk := chunks[i] + err = chunk.Decode(decodeContext, bufs[j]) + if err != nil { + cacheCorrupt.Inc() + return + } + found = append(found, chunk) + i++ + j++ + } + } + + for ; i < len(chunks); i++ { + missing = append(missing, chunks[i]) + } + + return +} diff --git a/pkg/chunk/composite_store.go b/pkg/chunk/composite_store.go new file mode 100644 index 00000000000..4e098564af8 --- /dev/null +++ b/pkg/chunk/composite_store.go @@ -0,0 +1,202 @@ +package chunk + +import ( + "context" + "fmt" + "sort" + + "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/pkg/labels" +) + +// Store for chunks. +type Store interface { + Put(ctx context.Context, chunks []Chunk) error + Get(tx context.Context, from, through model.Time, matchers ...*labels.Matcher) ([]Chunk, error) + Stop() +} + +// compositeStore is a Store which delegates to various stores depending +// on when they were activated. +type compositeStore struct { + stores []compositeStoreEntry +} + +type compositeStoreEntry struct { + start model.Time + Store +} + +type byStart []compositeStoreEntry + +func (a byStart) Len() int { return len(a) } +func (a byStart) Swap(i, j int) { a[i], a[j] = a[j], a[i] } +func (a byStart) Less(i, j int) bool { return a[i].start < a[j].start } + +// NewCompositeStore creates a new Store which delegates to different stores depending +// on time. +func NewCompositeStore(cfg StoreConfig, schemaCfg SchemaConfig, storage StorageClient) (Store, error) { + store, err := newStore(cfg, v1Schema(schemaCfg), storage) + if err != nil { + return nil, err + } + + stores := []compositeStoreEntry{ + {0, store}, + } + + if schemaCfg.DailyBucketsFrom.IsSet() { + store, err := newStore(cfg, v2Schema(schemaCfg), storage) + if err != nil { + return nil, err + } + stores = append(stores, compositeStoreEntry{schemaCfg.DailyBucketsFrom.Time, store}) + } + + if schemaCfg.Base64ValuesFrom.IsSet() { + store, err := newStore(cfg, v3Schema(schemaCfg), storage) + if err != nil { + return nil, err + } + stores = append(stores, compositeStoreEntry{schemaCfg.Base64ValuesFrom.Time, store}) + } + + if schemaCfg.V4SchemaFrom.IsSet() { + store, err := newStore(cfg, v4Schema(schemaCfg), storage) + if err != nil { + return nil, err + } + stores = append(stores, compositeStoreEntry{schemaCfg.V4SchemaFrom.Time, store}) + } + + if schemaCfg.V5SchemaFrom.IsSet() { + store, err := newStore(cfg, v5Schema(schemaCfg), storage) + if err != nil { + return nil, err + } + stores = append(stores, compositeStoreEntry{schemaCfg.V5SchemaFrom.Time, store}) + } + + if schemaCfg.V6SchemaFrom.IsSet() { + store, err := newStore(cfg, v6Schema(schemaCfg), storage) + if err != nil { + return nil, err + } + stores = append(stores, compositeStoreEntry{schemaCfg.V6SchemaFrom.Time, store}) + } + + if schemaCfg.V7SchemaFrom.IsSet() { + store, err := newStore(cfg, v7Schema(schemaCfg), storage) + if err != nil { + return nil, err + } + stores = append(stores, compositeStoreEntry{schemaCfg.V7SchemaFrom.Time, store}) + } + + if schemaCfg.V8SchemaFrom.IsSet() { + store, err := newStore(cfg, v8Schema(schemaCfg), storage) + if err != nil { + return nil, err + } + stores = append(stores, compositeStoreEntry{schemaCfg.V8SchemaFrom.Time, store}) + } + + if schemaCfg.V9SchemaFrom.IsSet() { + store, err := newSeriesStore(cfg, v9Schema(schemaCfg), storage) + if err != nil { + return nil, err + } + stores = append(stores, compositeStoreEntry{schemaCfg.V9SchemaFrom.Time, store}) + } + + if !sort.IsSorted(byStart(stores)) { + return nil, fmt.Errorf("schemas not in time-sorted order") + } + + return compositeStore{stores}, nil +} + +func (c compositeStore) Put(ctx context.Context, chunks []Chunk) error { + for _, chunk := range chunks { + err := c.forStores(chunk.From, chunk.Through, func(_, _ model.Time, store Store) error { + return store.Put(ctx, []Chunk{chunk}) + }) + if err != nil { + return err + } + } + return nil +} + +func (c compositeStore) Get(ctx context.Context, from, through model.Time, matchers ...*labels.Matcher) ([]Chunk, error) { + var results []Chunk + err := c.forStores(from, through, func(from, through model.Time, store Store) error { + chunks, err := store.Get(ctx, from, through, matchers...) + if err != nil { + return err + } + results = append(results, chunks...) + return nil + }) + return results, err +} + +func (c compositeStore) Stop() { + for _, store := range c.stores { + store.Stop() + } +} + +func (c compositeStore) forStores(from, through model.Time, callback func(from, through model.Time, store Store) error) error { + if len(c.stores) == 0 { + return nil + } + + // first, find the schema with the highest start _before or at_ from + i := sort.Search(len(c.stores), func(i int) bool { + return c.stores[i].start > from + }) + if i > 0 { + i-- + } else { + // This could happen if we get passed a sample from before 1970. + i = 0 + from = c.stores[0].start + } + + // next, find the schema with the lowest start _after_ through + j := sort.Search(len(c.stores), func(j int) bool { + return c.stores[j].start > through + }) + + min := func(a, b model.Time) model.Time { + if a < b { + return a + } + return b + } + + start := from + for ; i < j; i++ { + nextSchemaStarts := model.Latest + if i+1 < len(c.stores) { + nextSchemaStarts = c.stores[i+1].start + } + + // If the next schema starts at the same time as this one, + // skip this one. + if nextSchemaStarts == c.stores[i].start { + continue + } + + end := min(through, nextSchemaStarts-1) + err := callback(start, end, c.stores[i].Store) + if err != nil { + return err + } + + start = nextSchemaStarts + } + + return nil +} diff --git a/pkg/chunk/composite_store_test.go b/pkg/chunk/composite_store_test.go new file mode 100644 index 00000000000..c315aef0d27 --- /dev/null +++ b/pkg/chunk/composite_store_test.go @@ -0,0 +1,167 @@ +package chunk + +import ( + "context" + "fmt" + "reflect" + "testing" + + "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/pkg/labels" + "github.com/weaveworks/common/test" +) + +type mockStore int + +func (m mockStore) Put(ctx context.Context, chunks []Chunk) error { + return nil +} + +func (m mockStore) Get(tx context.Context, from, through model.Time, matchers ...*labels.Matcher) ([]Chunk, error) { + return nil, nil +} + +func (m mockStore) Stop() {} + +func TestCompositeStore(t *testing.T) { + type result struct { + from, through model.Time + store Store + } + collect := func(results *[]result) func(from, through model.Time, store Store) error { + return func(from, through model.Time, store Store) error { + *results = append(*results, result{from, through, store}) + return nil + } + } + cs := compositeStore{ + stores: []compositeStoreEntry{ + {model.TimeFromUnix(0), mockStore(1)}, + {model.TimeFromUnix(100), mockStore(2)}, + {model.TimeFromUnix(200), mockStore(3)}, + }, + } + + for i, tc := range []struct { + cs compositeStore + from, through int64 + want []result + }{ + // Test we have sensible results when there are no schema's defined + {compositeStore{}, 0, 1, []result{}}, + + // Test we have sensible results when there is a single schema + { + compositeStore{ + stores: []compositeStoreEntry{ + {model.TimeFromUnix(0), mockStore(1)}, + }, + }, + 0, 10, + []result{ + {model.TimeFromUnix(0), model.TimeFromUnix(10), mockStore(1)}, + }, + }, + + // Test we have sensible results for negative (ie pre 1970) times + { + compositeStore{ + stores: []compositeStoreEntry{ + {model.TimeFromUnix(0), mockStore(1)}, + }, + }, + -10, -9, + []result{}, + }, + { + compositeStore{ + stores: []compositeStoreEntry{ + {model.TimeFromUnix(0), mockStore(1)}, + }, + }, + -10, 10, + []result{ + {model.TimeFromUnix(0), model.TimeFromUnix(10), mockStore(1)}, + }, + }, + + // Test we have sensible results when there is two schemas + { + compositeStore{ + stores: []compositeStoreEntry{ + {model.TimeFromUnix(0), mockStore(1)}, + {model.TimeFromUnix(100), mockStore(2)}, + }, + }, + 34, 165, + []result{ + {model.TimeFromUnix(34), model.TimeFromUnix(100) - 1, mockStore(1)}, + {model.TimeFromUnix(100), model.TimeFromUnix(165), mockStore(2)}, + }, + }, + + // Test we get only one result when two schema start at same time + { + compositeStore{ + stores: []compositeStoreEntry{ + {model.TimeFromUnix(0), mockStore(1)}, + {model.TimeFromUnix(10), mockStore(2)}, + {model.TimeFromUnix(10), mockStore(3)}, + }, + }, + 0, 165, + []result{ + {model.TimeFromUnix(0), model.TimeFromUnix(10) - 1, mockStore(1)}, + {model.TimeFromUnix(10), model.TimeFromUnix(165), mockStore(3)}, + }, + }, + + // Test all the various combination we can get when there are three schemas + { + cs, 34, 65, + []result{ + {model.TimeFromUnix(34), model.TimeFromUnix(65), mockStore(1)}, + }, + }, + + { + cs, 244, 6785, + []result{ + {model.TimeFromUnix(244), model.TimeFromUnix(6785), mockStore(3)}, + }, + }, + + { + cs, 34, 165, + []result{ + {model.TimeFromUnix(34), model.TimeFromUnix(100) - 1, mockStore(1)}, + {model.TimeFromUnix(100), model.TimeFromUnix(165), mockStore(2)}, + }, + }, + + { + cs, 151, 264, + []result{ + {model.TimeFromUnix(151), model.TimeFromUnix(200) - 1, mockStore(2)}, + {model.TimeFromUnix(200), model.TimeFromUnix(264), mockStore(3)}, + }, + }, + + { + cs, 32, 264, + []result{ + {model.TimeFromUnix(32), model.TimeFromUnix(100) - 1, mockStore(1)}, + {model.TimeFromUnix(100), model.TimeFromUnix(200) - 1, mockStore(2)}, + {model.TimeFromUnix(200), model.TimeFromUnix(264), mockStore(3)}, + }, + }, + } { + t.Run(fmt.Sprintf("%d", i), func(t *testing.T) { + have := []result{} + tc.cs.forStores(model.TimeFromUnix(tc.from), model.TimeFromUnix(tc.through), collect(&have)) + if !reflect.DeepEqual(tc.want, have) { + t.Fatalf("wrong stores - %s", test.Diff(tc.want, have)) + } + }) + } +} diff --git a/pkg/chunk/heap_cache.go b/pkg/chunk/heap_cache.go new file mode 100644 index 00000000000..5f3e56b3409 --- /dev/null +++ b/pkg/chunk/heap_cache.go @@ -0,0 +1,106 @@ +package chunk + +import ( + "container/heap" + "sync" + "time" +) + +// heapCache is a simple string -> interface{} cache which uses a heap to manage +// evictions. O(log N) inserts, O(n) get and update. +type heapCache struct { + lock sync.RWMutex + size int + entries []cacheEntry + index map[string]int +} + +type cacheEntry struct { + updated time.Time + key string + value interface{} +} + +func newHeapCache(size int) *heapCache { + return &heapCache{ + size: size, + entries: make([]cacheEntry, 0, size), + index: make(map[string]int, size), + } +} + +func (c *heapCache) Len() int { + return len(c.entries) +} + +func (c *heapCache) Less(i, j int) bool { + return c.entries[i].updated.Before(c.entries[j].updated) +} + +func (c *heapCache) Swap(i, j int) { + c.entries[i], c.entries[j] = c.entries[j], c.entries[i] + c.index[c.entries[i].key] = i + c.index[c.entries[j].key] = j +} + +func (c *heapCache) Push(x interface{}) { + n := len(c.entries) + entry := x.(cacheEntry) + c.entries = append(c.entries, entry) + c.index[entry.key] = n +} + +func (c *heapCache) Pop() interface{} { + n := len(c.entries) + entry := c.entries[n-1] + c.entries = c.entries[0 : n-1] + delete(c.index, entry.key) + return entry +} + +func (c *heapCache) put(key string, value interface{}) { + if c.size == 0 { + return + } + + c.lock.Lock() + defer c.lock.Unlock() + + // See if we already have the entry + index, ok := c.index[key] + if ok { + c.entries[index].updated = time.Now() + c.entries[index].value = value + heap.Fix(c, index) + return + } + + // Otherwise, see if we need to evict an entry (heap will update index). + if len(c.entries) >= c.size { + heap.Pop(c) + } + + // Put new entry on the heap (heap will update index). + heap.Push(c, cacheEntry{ + updated: time.Now(), + key: key, + value: value, + }) +} + +func (c *heapCache) get(key string) (value interface{}, updated time.Time, ok bool) { + if c.size == 0 { + return + } + + c.lock.RLock() + defer c.lock.RUnlock() + + var index int + index, ok = c.index[key] + if ok { + value = c.entries[index].value + updated = c.entries[index].updated + } + return +} diff --git a/pkg/chunk/heap_cache_test.go b/pkg/chunk/heap_cache_test.go new file mode 100644 index 00000000000..885e59c5651 --- /dev/null +++ b/pkg/chunk/heap_cache_test.go @@ -0,0 +1,55 @@ +package chunk + +import ( + "strconv" + "testing" + + "github.com/stretchr/testify/require" +) + +func TestHeapCache(t *testing.T) { + c := newHeapCache(10) + + // Check put / get works + for i := 0; i < 10; i++ { + c.put(strconv.Itoa(i), i) + } + require.Len(t, c.index, 10) + require.Len(t, c.entries, 10) + + for i := 0; i < 10; i++ { + value, _, ok := c.get(strconv.Itoa(i)) + require.True(t, ok) + require.Equal(t, i, value.(int)) + } + + // Check evictions + for i := 10; i < 15; i++ { + c.put(strconv.Itoa(i), i) + } + require.Len(t, c.index, 10) + require.Len(t, c.entries, 10) + + for i := 0; i < 5; i++ { + _, _, ok := c.get(strconv.Itoa(i)) + require.False(t, ok) + } + for i := 5; i < 15; i++ { + value, _, ok := c.get(strconv.Itoa(i)) + require.True(t, ok) + require.Equal(t, i, value.(int)) + } + + // Check updates work + for i := 5; i < 15; i++ { + c.put(strconv.Itoa(i), i*2) + } + require.Len(t, c.index, 10) + require.Len(t, c.entries, 10) + + for i := 5; i < 15; i++ { + value, _, ok := c.get(strconv.Itoa(i)) + require.True(t, ok) + require.Equal(t, i*2, value.(int)) + } +} diff --git a/pkg/chunk/inmemory_storage_client.go b/pkg/chunk/inmemory_storage_client.go index 3dea4728db9..f6480691330 100644 --- a/pkg/chunk/inmemory_storage_client.go +++ b/pkg/chunk/inmemory_storage_client.go @@ -143,7 +143,8 @@ func (m *MockStorage) BatchWrite(ctx context.Context, batch WriteBatch) error { // Return error if duplicate write and not metric name entry or series entry itemComponents := decodeRangeKey(items[i].rangeValue) if !bytes.Equal(itemComponents[3], metricNameRangeKeyV1) && - !bytes.Equal(itemComponents[3], seriesRangeKeyV1) { + !bytes.Equal(itemComponents[3], seriesRangeKeyV1) && + !bytes.Equal(itemComponents[3], labelSeriesRangeKeyV1) { return fmt.Errorf("Dupe write") } } @@ -160,6 +161,8 @@ func (m *MockStorage) BatchWrite(ctx context.Context, batch WriteBatch) error { // QueryPages implements StorageClient. func (m *MockStorage) QueryPages(ctx context.Context, query IndexQuery, callback func(result ReadBatch) (shouldContinue bool)) error { logger := util.WithContext(ctx, util.Logger) + level.Debug(logger).Log("msg", "QueryPages", "query", query.HashValue) + m.mtx.RLock() defer m.mtx.RUnlock() @@ -170,11 +173,12 @@ func (m *MockStorage) QueryPages(ctx context.Context, query IndexQuery, callback items, ok := table.items[query.HashValue] if !ok { + level.Debug(logger).Log("msg", "not found") return nil } if query.RangeValuePrefix != nil { - level.Debug(util.WithContext(ctx, logger)).Log("msg", "lookup prefix", "hash", query.HashValue, "range_prefix", query.RangeValuePrefix, "num_items", len(items)) + level.Debug(logger).Log("msg", "lookup prefix", "hash", query.HashValue, "range_prefix", query.RangeValuePrefix, "num_items", len(items)) // the smallest index i in [0, n) at which f(i) is true i := sort.Search(len(items), func(i int) bool { @@ -190,33 +194,33 @@ func (m *MockStorage) QueryPages(ctx context.Context, query IndexQuery, callback return !bytes.HasPrefix(items[i+j].rangeValue, query.RangeValuePrefix) }) - level.Debug(util.WithContext(ctx, logger)).Log("msg", "found range", "from_inclusive", i, "to_exclusive", i+j) + level.Debug(logger).Log("msg", "found range", "from_inclusive", i, "to_exclusive", i+j) if i > len(items) || j == 0 { return nil } items = items[i : i+j] } else if query.RangeValueStart != nil { - level.Debug(util.WithContext(ctx, logger)).Log("msg", "lookup range", "hash", query.HashValue, "range_start", query.RangeValueStart, "num_items", len(items)) + level.Debug(logger).Log("msg", "lookup range", "hash", query.HashValue, "range_start", query.RangeValueStart, "num_items", len(items)) // the smallest index i in [0, n) at which f(i) is true i := sort.Search(len(items), func(i int) bool { return bytes.Compare(items[i].rangeValue, query.RangeValueStart) >= 0 }) - level.Debug(util.WithContext(ctx, logger)).Log("msg", "found range [%d)", "index", i) + level.Debug(logger).Log("msg", "found range [%d)", "index", i) if i > len(items) { return nil } items = items[i:] } else { - level.Debug(util.WithContext(ctx, logger)).Log("msg", "lookup", "hash", query.HashValue, "num_items", len(items)) + level.Debug(logger).Log("msg", "lookup", "hash", query.HashValue, "num_items", len(items)) } // Filters if query.ValueEqual != nil { - level.Debug(util.WithContext(ctx, logger)).Log("msg", "filter by equality", "value_equal", query.ValueEqual) + level.Debug(logger).Log("msg", "filter by equality", "value_equal", query.ValueEqual) filtered := make([]mockItem, 0) for _, v := range items { diff --git a/pkg/chunk/schema.go b/pkg/chunk/schema.go index 9657fa1cfbf..acb292bc96f 100644 --- a/pkg/chunk/schema.go +++ b/pkg/chunk/schema.go @@ -18,7 +18,10 @@ var ( chunkTimeRangeKeyV4 = []byte{'4'} chunkTimeRangeKeyV5 = []byte{'5'} metricNameRangeKeyV1 = []byte{'6'} - seriesRangeKeyV1 = []byte{'7'} + + // For v9 schema + seriesRangeKeyV1 = []byte{'7'} + labelSeriesRangeKeyV1 = []byte{'8'} ) // Errors @@ -37,6 +40,9 @@ type Schema interface { GetReadQueriesForMetric(from, through model.Time, userID string, metricName model.LabelValue) ([]IndexQuery, error) GetReadQueriesForMetricLabel(from, through model.Time, userID string, metricName model.LabelValue, labelName model.LabelName) ([]IndexQuery, error) GetReadQueriesForMetricLabelValue(from, through model.Time, userID string, metricName model.LabelValue, labelName model.LabelName, labelValue model.LabelValue) ([]IndexQuery, error) + + // If the query resulted in series IDs, use this method to find chunks. + GetChunksForSeries(from, through model.Time, userID string, seriesID []byte) ([]IndexQuery, error) } // IndexQuery describes a query for entries @@ -142,6 +148,14 @@ func v8Schema(cfg SchemaConfig) Schema { } } +// v9 schema index series, not chunks. +func v9Schema(cfg SchemaConfig) Schema { + return schema{ + cfg.dailyBuckets, + v9Entries{}, + } +} + // schema implements Schema given a bucketing function and and set of range key callbacks type schema struct { buckets func(from, through model.Time, userID string) []Bucket @@ -217,12 +231,27 @@ func (s schema) GetReadQueriesForMetricLabelValue(from, through model.Time, user return result, nil } +func (s schema) GetChunksForSeries(from, through model.Time, userID string, seriesID []byte) ([]IndexQuery, error) { + var result []IndexQuery + + buckets := s.buckets(from, through, userID) + for _, bucket := range buckets { + entries, err := s.entries.GetChunksForSeries(bucket, seriesID) + if err != nil { + return nil, err + } + result = append(result, entries...) + } + return result, nil +} + type entries interface { GetWriteEntries(bucket Bucket, metricName model.LabelValue, labels model.Metric, chunkID string) ([]IndexEntry, error) GetReadQueries(bucket Bucket) ([]IndexQuery, error) GetReadMetricQueries(bucket Bucket, metricName model.LabelValue) ([]IndexQuery, error) GetReadMetricLabelQueries(bucket Bucket, metricName model.LabelValue, labelName model.LabelName) ([]IndexQuery, error) GetReadMetricLabelValueQueries(bucket Bucket, metricName model.LabelValue, labelName model.LabelName, labelValue model.LabelValue) ([]IndexQuery, error) + GetChunksForSeries(bucket Bucket, seriesID []byte) ([]IndexQuery, error) } type originalEntries struct{} @@ -283,6 +312,10 @@ func (originalEntries) GetReadMetricLabelValueQueries(bucket Bucket, metricName }, nil } +func (originalEntries) GetChunksForSeries(_ Bucket, _ []byte) ([]IndexQuery, error) { + return nil, ErrNoMetricNameNotSupported +} + type base64Entries struct { originalEntries } @@ -380,6 +413,10 @@ func (labelNameInHashKeyEntries) GetReadMetricLabelValueQueries(bucket Bucket, m }, nil } +func (labelNameInHashKeyEntries) GetChunksForSeries(_ Bucket, _ []byte) ([]IndexQuery, error) { + return nil, ErrNoMetricNameNotSupported +} + // v5Entries includes chunk end time in range key - see #298. type v5Entries struct{} @@ -441,6 +478,10 @@ func (v5Entries) GetReadMetricLabelValueQueries(bucket Bucket, metricName model. }, nil } +func (v5Entries) GetChunksForSeries(_ Bucket, _ []byte) ([]IndexQuery, error) { + return nil, ErrNoMetricNameNotSupported +} + // v6Entries fixes issues with v5 time encoding being wrong (see #337), and // moves label value out of range key (see #199). type v6Entries struct{} @@ -510,6 +551,10 @@ func (v6Entries) GetReadMetricLabelValueQueries(bucket Bucket, metricName model. }, nil } +func (v6Entries) GetChunksForSeries(_ Bucket, _ []byte) ([]IndexQuery, error) { + return nil, ErrNoMetricNameNotSupported +} + // v7Entries is a deprecated scherma initially used to support queries with no metric name. Use v8Entries instead. type v7Entries struct { v6Entries @@ -579,3 +624,89 @@ func (v8Entries) GetReadQueries(bucket Bucket) ([]IndexQuery, error) { }, }, nil } + +// v9Entries adds a layer of indirection between labels -> series -> chunks. +type v9Entries struct { +} + +func (v9Entries) GetWriteEntries(bucket Bucket, metricName model.LabelValue, labels model.Metric, chunkID string) ([]IndexEntry, error) { + seriesID := sha256bytes(labels.String()) + encodedThroughBytes := encodeTime(bucket.through) + + entries := []IndexEntry{ + // Entry for metricName -> seriesID + { + TableName: bucket.tableName, + HashValue: bucket.hashKey + ":" + string(metricName), + RangeValue: encodeRangeKey(seriesID, nil, nil, seriesRangeKeyV1), + }, + // Entry for seriesID -> chunkID + { + TableName: bucket.tableName, + HashValue: bucket.hashKey + ":" + string(seriesID), + RangeValue: encodeRangeKey(encodedThroughBytes, nil, []byte(chunkID), chunkTimeRangeKeyV3), + }, + } + + // Entries for metricName:labelName -> hash(value):seriesID + // We use a hash of the value to limit its length. + for key, value := range labels { + if key == model.MetricNameLabel { + continue + } + valueHash := sha256bytes(string(value)) + entries = append(entries, IndexEntry{ + TableName: bucket.tableName, + HashValue: fmt.Sprintf("%s:%s:%s", bucket.hashKey, metricName, key), + RangeValue: encodeRangeKey(valueHash, seriesID, nil, labelSeriesRangeKeyV1), + Value: []byte(value), + }) + } + + return entries, nil +} + +func (v9Entries) GetReadQueries(bucket Bucket) ([]IndexQuery, error) { + return nil, ErrNoMetricNameNotSupported +} + +func (v9Entries) GetReadMetricQueries(bucket Bucket, metricName model.LabelValue) ([]IndexQuery, error) { + return []IndexQuery{ + { + TableName: bucket.tableName, + HashValue: bucket.hashKey + ":" + string(metricName), + }, + }, nil +} + +func (v9Entries) GetReadMetricLabelQueries(bucket Bucket, metricName model.LabelValue, labelName model.LabelName) ([]IndexQuery, error) { + return []IndexQuery{ + { + TableName: bucket.tableName, + HashValue: fmt.Sprintf("%s:%s:%s", bucket.hashKey, metricName, labelName), + }, + }, nil +} + +func (v9Entries) GetReadMetricLabelValueQueries(bucket Bucket, metricName model.LabelValue, labelName model.LabelName, labelValue model.LabelValue) ([]IndexQuery, error) { + valueHash := sha256bytes(string(labelValue)) + return []IndexQuery{ + { + TableName: bucket.tableName, + HashValue: fmt.Sprintf("%s:%s:%s", bucket.hashKey, metricName, labelName), + RangeValueStart: encodeRangeKey(valueHash), + ValueEqual: []byte(labelValue), + }, + }, nil +} + +func (v9Entries) GetChunksForSeries(bucket Bucket, seriesID []byte) ([]IndexQuery, error) { + encodedFromBytes := encodeTime(bucket.from) + return []IndexQuery{ + { + TableName: bucket.tableName, + HashValue: bucket.hashKey + ":" + string(seriesID), + RangeValueStart: encodeRangeKey(encodedFromBytes), + }, + }, nil +} diff --git a/pkg/chunk/schema_config.go b/pkg/chunk/schema_config.go index d9e65862bef..da608a44f21 100644 --- a/pkg/chunk/schema_config.go +++ b/pkg/chunk/schema_config.go @@ -3,7 +3,6 @@ package chunk import ( "flag" "fmt" - "sort" "strconv" "time" @@ -31,6 +30,7 @@ type SchemaConfig struct { V6SchemaFrom util.DayValue V7SchemaFrom util.DayValue V8SchemaFrom util.DayValue + V9SchemaFrom util.DayValue // Master 'off-switch' for table capacity updates, e.g. when troubleshooting ThroughputUpdatesDisabled bool @@ -60,6 +60,7 @@ func (cfg *SchemaConfig) RegisterFlags(f *flag.FlagSet) { f.Var(&cfg.V6SchemaFrom, "dynamodb.v6-schema-from", "The date (in the format YYYY-MM-DD) after which we enable v6 schema.") f.Var(&cfg.V7SchemaFrom, "dynamodb.v7-schema-from", "The date (in the format YYYY-MM-DD) after which we enable v7 schema (Deprecated).") f.Var(&cfg.V8SchemaFrom, "dynamodb.v8-schema-from", "The date (in the format YYYY-MM-DD) after which we enable v8 schema (Deprecated).") + f.Var(&cfg.V9SchemaFrom, "dynamodb.v9-schema-from", "The data (in the format YYYY-MM-DD) after which we enable v9 schema (Series indexing).") f.BoolVar(&cfg.ThroughputUpdatesDisabled, "table-manager.throughput-updates-disabled", false, "If true, disable all changes to DB capacity") f.DurationVar(&cfg.DynamoDBPollInterval, "dynamodb.poll-interval", 2*time.Minute, "How frequently to poll DynamoDB to learn our capacity.") @@ -282,202 +283,3 @@ func (cfg *PeriodicTableConfig) TableFor(t model.Time) string { ) return cfg.Prefix + strconv.Itoa(int(table)) } - -// compositeSchema is a Schema which delegates to various schemas depending -// on when they were activated. -type compositeSchema struct { - schemas []compositeSchemaEntry -} - -type compositeSchemaEntry struct { - start model.Time - Schema -} - -type byStart []compositeSchemaEntry - -func (a byStart) Len() int { return len(a) } -func (a byStart) Swap(i, j int) { a[i], a[j] = a[j], a[i] } -func (a byStart) Less(i, j int) bool { return a[i].start < a[j].start } - -func newCompositeSchema(cfg SchemaConfig) (Schema, error) { - schemas := []compositeSchemaEntry{ - {0, v1Schema(cfg)}, - } - - if cfg.DailyBucketsFrom.IsSet() { - schemas = append(schemas, compositeSchemaEntry{cfg.DailyBucketsFrom.Time, v2Schema(cfg)}) - } - - if cfg.Base64ValuesFrom.IsSet() { - schemas = append(schemas, compositeSchemaEntry{cfg.Base64ValuesFrom.Time, v3Schema(cfg)}) - } - - if cfg.V4SchemaFrom.IsSet() { - schemas = append(schemas, compositeSchemaEntry{cfg.V4SchemaFrom.Time, v4Schema(cfg)}) - } - - if cfg.V5SchemaFrom.IsSet() { - schemas = append(schemas, compositeSchemaEntry{cfg.V5SchemaFrom.Time, v5Schema(cfg)}) - } - - if cfg.V6SchemaFrom.IsSet() { - schemas = append(schemas, compositeSchemaEntry{cfg.V6SchemaFrom.Time, v6Schema(cfg)}) - } - - if cfg.V7SchemaFrom.IsSet() { - schemas = append(schemas, compositeSchemaEntry{cfg.V7SchemaFrom.Time, v7Schema(cfg)}) - } - - if cfg.V8SchemaFrom.IsSet() { - schemas = append(schemas, compositeSchemaEntry{cfg.V8SchemaFrom.Time, v8Schema(cfg)}) - } - - if !sort.IsSorted(byStart(schemas)) { - return nil, fmt.Errorf("schemas not in time-sorted order") - } - - return compositeSchema{schemas}, nil -} - -func (c compositeSchema) forSchemasIndexQuery(from, through model.Time, callback func(from, through model.Time, schema Schema) ([]IndexQuery, error)) ([]IndexQuery, error) { - if len(c.schemas) == 0 { - return nil, nil - } - - // first, find the schema with the highest start _before or at_ from - i := sort.Search(len(c.schemas), func(i int) bool { - return c.schemas[i].start > from - }) - if i > 0 { - i-- - } else { - // This could happen if we get passed a sample from before 1970. - i = 0 - from = c.schemas[0].start - } - - // next, find the schema with the lowest start _after_ through - j := sort.Search(len(c.schemas), func(j int) bool { - return c.schemas[j].start > through - }) - - min := func(a, b model.Time) model.Time { - if a < b { - return a - } - return b - } - - start := from - result := []IndexQuery{} - for ; i < j; i++ { - nextSchemaStarts := model.Latest - if i+1 < len(c.schemas) { - nextSchemaStarts = c.schemas[i+1].start - } - - // If the next schema starts at the same time as this one, - // skip this one. - if nextSchemaStarts == c.schemas[i].start { - continue - } - - end := min(through, nextSchemaStarts-1) - entries, err := callback(start, end, c.schemas[i].Schema) - if err != nil { - return nil, err - } - - result = append(result, entries...) - start = nextSchemaStarts - } - - return result, nil -} - -func (c compositeSchema) forSchemasIndexEntry(from, through model.Time, callback func(from, through model.Time, schema Schema) ([]IndexEntry, error)) ([]IndexEntry, error) { - if len(c.schemas) == 0 { - return nil, nil - } - - // first, find the schema with the highest start _before or at_ from - i := sort.Search(len(c.schemas), func(i int) bool { - return c.schemas[i].start > from - }) - if i > 0 { - i-- - } else { - // This could happen if we get passed a sample from before 1970. - i = 0 - from = c.schemas[0].start - } - - // next, find the schema with the lowest start _after_ through - j := sort.Search(len(c.schemas), func(j int) bool { - return c.schemas[j].start > through - }) - - min := func(a, b model.Time) model.Time { - if a < b { - return a - } - return b - } - - start := from - result := []IndexEntry{} - for ; i < j; i++ { - nextSchemaStarts := model.Latest - if i+1 < len(c.schemas) { - nextSchemaStarts = c.schemas[i+1].start - } - - // If the next schema starts at the same time as this one, - // skip this one. - if nextSchemaStarts == c.schemas[i].start { - continue - } - - end := min(through, nextSchemaStarts-1) - entries, err := callback(start, end, c.schemas[i].Schema) - if err != nil { - return nil, err - } - - result = append(result, entries...) - start = nextSchemaStarts - } - - return result, nil -} - -func (c compositeSchema) GetWriteEntries(from, through model.Time, userID string, metricName model.LabelValue, labels model.Metric, chunkID string) ([]IndexEntry, error) { - return c.forSchemasIndexEntry(from, through, func(from, through model.Time, schema Schema) ([]IndexEntry, error) { - return schema.GetWriteEntries(from, through, userID, metricName, labels, chunkID) - }) -} - -func (c compositeSchema) GetReadQueries(from, through model.Time, userID string) ([]IndexQuery, error) { - return c.forSchemasIndexQuery(from, through, func(from, through model.Time, schema Schema) ([]IndexQuery, error) { - return schema.GetReadQueries(from, through, userID) - }) -} - -func (c compositeSchema) GetReadQueriesForMetric(from, through model.Time, userID string, metricName model.LabelValue) ([]IndexQuery, error) { - return c.forSchemasIndexQuery(from, through, func(from, through model.Time, schema Schema) ([]IndexQuery, error) { - return schema.GetReadQueriesForMetric(from, through, userID, metricName) - }) -} - -func (c compositeSchema) GetReadQueriesForMetricLabel(from, through model.Time, userID string, metricName model.LabelValue, labelName model.LabelName) ([]IndexQuery, error) { - return c.forSchemasIndexQuery(from, through, func(from, through model.Time, schema Schema) ([]IndexQuery, error) { - return schema.GetReadQueriesForMetricLabel(from, through, userID, metricName, labelName) - }) -} - -func (c compositeSchema) GetReadQueriesForMetricLabelValue(from, through model.Time, userID string, metricName model.LabelValue, labelName model.LabelName, labelValue model.LabelValue) ([]IndexQuery, error) { - return c.forSchemasIndexQuery(from, through, func(from, through model.Time, schema Schema) ([]IndexQuery, error) { - return schema.GetReadQueriesForMetricLabelValue(from, through, userID, metricName, labelName, labelValue) - }) -} diff --git a/pkg/chunk/schema_config_test.go b/pkg/chunk/schema_config_test.go index 2bf6e1d1d41..3ff0455e2d3 100644 --- a/pkg/chunk/schema_config_test.go +++ b/pkg/chunk/schema_config_test.go @@ -1,12 +1,10 @@ package chunk import ( - "fmt" "reflect" "testing" "github.com/prometheus/common/model" - "github.com/weaveworks/common/test" ) func TestHourlyBuckets(t *testing.T) { @@ -180,146 +178,3 @@ func TestDailyBuckets(t *testing.T) { }) } } - -func TestCompositeSchema(t *testing.T) { - type result struct { - from, through model.Time - schema Schema - } - collect := func(results *[]result) func(from, through model.Time, schema Schema) ([]IndexEntry, error) { - return func(from, through model.Time, schema Schema) ([]IndexEntry, error) { - *results = append(*results, result{from, through, schema}) - return nil, nil - } - } - cs := compositeSchema{ - schemas: []compositeSchemaEntry{ - {model.TimeFromUnix(0), mockSchema(1)}, - {model.TimeFromUnix(100), mockSchema(2)}, - {model.TimeFromUnix(200), mockSchema(3)}, - }, - } - - for i, tc := range []struct { - cs compositeSchema - from, through int64 - want []result - }{ - // Test we have sensible results when there are no schema's defined - {compositeSchema{}, 0, 1, []result{}}, - - // Test we have sensible results when there is a single schema - { - compositeSchema{ - schemas: []compositeSchemaEntry{ - {model.TimeFromUnix(0), mockSchema(1)}, - }, - }, - 0, 10, - []result{ - {model.TimeFromUnix(0), model.TimeFromUnix(10), mockSchema(1)}, - }, - }, - - // Test we have sensible results for negative (ie pre 1970) times - { - compositeSchema{ - schemas: []compositeSchemaEntry{ - {model.TimeFromUnix(0), mockSchema(1)}, - }, - }, - -10, -9, - []result{}, - }, - { - compositeSchema{ - schemas: []compositeSchemaEntry{ - {model.TimeFromUnix(0), mockSchema(1)}, - }, - }, - -10, 10, - []result{ - {model.TimeFromUnix(0), model.TimeFromUnix(10), mockSchema(1)}, - }, - }, - - // Test we have sensible results when there is two schemas - { - compositeSchema{ - schemas: []compositeSchemaEntry{ - {model.TimeFromUnix(0), mockSchema(1)}, - {model.TimeFromUnix(100), mockSchema(2)}, - }, - }, - 34, 165, - []result{ - {model.TimeFromUnix(34), model.TimeFromUnix(100) - 1, mockSchema(1)}, - {model.TimeFromUnix(100), model.TimeFromUnix(165), mockSchema(2)}, - }, - }, - - // Test we get only one result when two schema start at same time - { - compositeSchema{ - schemas: []compositeSchemaEntry{ - {model.TimeFromUnix(0), mockSchema(1)}, - {model.TimeFromUnix(10), mockSchema(2)}, - {model.TimeFromUnix(10), mockSchema(3)}, - }, - }, - 0, 165, - []result{ - {model.TimeFromUnix(0), model.TimeFromUnix(10) - 1, mockSchema(1)}, - {model.TimeFromUnix(10), model.TimeFromUnix(165), mockSchema(3)}, - }, - }, - - // Test all the various combination we can get when there are three schemas - { - cs, 34, 65, - []result{ - {model.TimeFromUnix(34), model.TimeFromUnix(65), mockSchema(1)}, - }, - }, - - { - cs, 244, 6785, - []result{ - {model.TimeFromUnix(244), model.TimeFromUnix(6785), mockSchema(3)}, - }, - }, - - { - cs, 34, 165, - []result{ - {model.TimeFromUnix(34), model.TimeFromUnix(100) - 1, mockSchema(1)}, - {model.TimeFromUnix(100), model.TimeFromUnix(165), mockSchema(2)}, - }, - }, - - { - cs, 151, 264, - []result{ - {model.TimeFromUnix(151), model.TimeFromUnix(200) - 1, mockSchema(2)}, - {model.TimeFromUnix(200), model.TimeFromUnix(264), mockSchema(3)}, - }, - }, - - { - cs, 32, 264, - []result{ - {model.TimeFromUnix(32), model.TimeFromUnix(100) - 1, mockSchema(1)}, - {model.TimeFromUnix(100), model.TimeFromUnix(200) - 1, mockSchema(2)}, - {model.TimeFromUnix(200), model.TimeFromUnix(264), mockSchema(3)}, - }, - }, - } { - t.Run(fmt.Sprintf("TestSchemaComposite[%d]", i), func(t *testing.T) { - have := []result{} - tc.cs.forSchemasIndexEntry(model.TimeFromUnix(tc.from), model.TimeFromUnix(tc.through), collect(&have)) - if !reflect.DeepEqual(tc.want, have) { - t.Fatalf("wrong schemas - %s", test.Diff(tc.want, have)) - } - }) - } -} diff --git a/pkg/chunk/schema_test.go b/pkg/chunk/schema_test.go index 70403f6d68c..e99ad66d286 100644 --- a/pkg/chunk/schema_test.go +++ b/pkg/chunk/schema_test.go @@ -17,24 +17,6 @@ import ( "github.com/weaveworks/cortex/pkg/util" ) -type mockSchema int - -func (mockSchema) GetWriteEntries(from, through model.Time, userID string, metricName model.LabelValue, labels model.Metric, chunkID string) ([]IndexEntry, error) { - return nil, nil -} -func (mockSchema) GetReadQueries(from, through model.Time, userID string) ([]IndexQuery, error) { - return nil, nil -} -func (mockSchema) GetReadQueriesForMetric(from, through model.Time, userID string, metricName model.LabelValue) ([]IndexQuery, error) { - return nil, nil -} -func (mockSchema) GetReadQueriesForMetricLabel(from, through model.Time, userID string, metricName model.LabelValue, labelName model.LabelName) ([]IndexQuery, error) { - return nil, nil -} -func (mockSchema) GetReadQueriesForMetricLabelValue(from, through model.Time, userID string, metricName model.LabelValue, labelName model.LabelName, labelValue model.LabelValue) ([]IndexQuery, error) { - return nil, nil -} - type ByHashRangeKey []IndexEntry func (a ByHashRangeKey) Len() int { return len(a) } @@ -81,15 +63,6 @@ func TestSchemaHashKeys(t *testing.T) { From: util.NewDayValue(model.TimeFromUnix(5 * 24 * 60 * 60)), }, } - compositeSchema := func(dailyBucketsFrom model.Time) Schema { - cfgCp := cfg - cfgCp.DailyBucketsFrom = util.NewDayValue(dailyBucketsFrom) - schema, err := newCompositeSchema(cfgCp) - if err != nil { - t.Fatal(err) - } - return schema - } hourlyBuckets := v1Schema(cfg) dailyBuckets := v3Schema(cfg) labelBuckets := v4Schema(cfg) @@ -134,83 +107,6 @@ func TestSchemaHashKeys(t *testing.T) { mkResult(table, "userid:d%d:foo:bar", 0, 3), ), }, - - // Buckets are by hour until we reach the `dailyBucketsFrom`, after which they are by day. - { - compositeSchema(model.TimeFromUnix(0).Add(1 * 24 * time.Hour)), - 0, (3 * 24 * 60 * 60) - 1, "foo", - mergeResults( - mkResult(table, "userid:%d:foo", 0, 1*24), - mkResult(table, "userid:d%d:foo", 1, 3), - ), - }, - - // Only the day part of `dailyBucketsFrom` matters, not the time part. - { - compositeSchema(model.TimeFromUnix(0).Add(2*24*time.Hour) - 1), - 0, (3 * 24 * 60 * 60) - 1, "foo", - mergeResults( - mkResult(table, "userid:%d:foo", 0, 1*24), - mkResult(table, "userid:d%d:foo", 1, 3), - ), - }, - - // Moving dailyBucketsFrom to the previous day compared to the above makes 24 1-hour buckets disappear. - { - compositeSchema(model.TimeFromUnix(0).Add(1*24*time.Hour) - 1), - 0, (3 * 24 * 60 * 60) - 1, "foo", - mkResult(table, "userid:d%d:foo", 0, 3), - }, - - // If `dailyBucketsFrom` is after the interval, everything will be bucketed by hour. - { - compositeSchema(model.TimeFromUnix(0).Add(99 * 24 * time.Hour)), - 0, (2 * 24 * 60 * 60) - 1, "foo", - mkResult(table, "userid:%d:foo", 0, 2*24), - }, - - // Should only return daily buckets when dailyBucketsFrom is before the interval. - { - compositeSchema(model.TimeFromUnix(0)), - 1 * 24 * 60 * 60, (3 * 24 * 60 * 60) - 1, "foo", - mkResult(table, "userid:d%d:foo", 1, 3), - }, - - // Basic weekly- ables. - { - compositeSchema(model.TimeFromUnix(0)), - 5 * 24 * 60 * 60, (10 * 24 * 60 * 60) - 1, "foo", - mergeResults( - mkResult(periodicPrefix+"2", "userid:d%d:foo", 5, 6), - mkResult(periodicPrefix+"3", "userid:d%d:foo", 6, 8), - mkResult(periodicPrefix+"4", "userid:d%d:foo", 8, 10), - ), - }, - - // Daily buckets + weekly tables. - { - compositeSchema(model.TimeFromUnix(0)), - 0, (10 * 24 * 60 * 60) - 1, "foo", - mergeResults( - mkResult(table, "userid:d%d:foo", 0, 5), - mkResult(periodicPrefix+"2", "userid:d%d:foo", 5, 6), - mkResult(periodicPrefix+"3", "userid:d%d:foo", 6, 8), - mkResult(periodicPrefix+"4", "userid:d%d:foo", 8, 10), - ), - }, - - // Houly Buckets, then daily buckets, then weekly tables. - { - compositeSchema(model.TimeFromUnix(2 * 24 * 60 * 60)), - 0, (10 * 24 * 60 * 60) - 1, "foo", - mergeResults( - mkResult(table, "userid:%d:foo", 0, 2*24), - mkResult(table, "userid:d%d:foo", 2, 5), - mkResult(periodicPrefix+"2", "userid:d%d:foo", 5, 6), - mkResult(periodicPrefix+"3", "userid:d%d:foo", 6, 8), - mkResult(periodicPrefix+"4", "userid:d%d:foo", 8, 10), - ), - }, } { t.Run(fmt.Sprintf("TestSchemaHashKeys[%d]", i), func(t *testing.T) { have, err := tc.Schema.GetWriteEntries( @@ -498,7 +394,7 @@ func TestSchemaRangeKey(t *testing.T) { _, err := parseMetricNameRangeValue(entry.RangeValue, entry.Value) require.NoError(t, err) case ChunkTimeRangeValue: - _, _, _, err := parseChunkTimeRangeValue(entry.RangeValue, entry.Value) + _, _, _, _, err := parseChunkTimeRangeValue(entry.RangeValue, entry.Value) require.NoError(t, err) case SeriesRangeValue: _, err := parseSeriesRangeValue(entry.RangeValue, entry.Value) diff --git a/pkg/chunk/schema_util.go b/pkg/chunk/schema_util.go index ec5d0d9eeaf..bbf5d5074dc 100644 --- a/pkg/chunk/schema_util.go +++ b/pkg/chunk/schema_util.go @@ -19,6 +19,11 @@ func metricSeriesID(m model.Metric) string { return string(encodeBase64Bytes(h[:])) } +func sha256bytes(s string) []byte { + h := sha256.Sum256([]byte(s)) + return encodeBase64Bytes(h[:]) +} + func encodeRangeKey(ss ...[]byte) []byte { length := 0 for _, s := range ss { @@ -127,45 +132,70 @@ func parseSeriesRangeValue(rangeValue []byte, value []byte) (model.Metric, error // parseChunkTimeRangeValue returns the chunkKey, labelValue and metadataInIndex // for chunk time range values. -func parseChunkTimeRangeValue(rangeValue []byte, value []byte) (string, model.LabelValue, bool, error) { +func parseChunkTimeRangeValue(rangeValue []byte, value []byte) ( + chunkID string, labelValue model.LabelValue, metadataInIndex bool, + isSeriesID bool, err error, +) { components := decodeRangeKey(rangeValue) switch { case len(components) < 3: - return "", "", false, errors.Errorf("invalid chunk time range value: %x", rangeValue) + err = errors.Errorf("invalid chunk time range value: %x", rangeValue) + return // v1 & v2 schema had three components - label name, label value and chunk ID. // No version number. case len(components) == 3: - return string(components[2]), model.LabelValue(components[1]), true, nil + chunkID = string(components[2]) + labelValue = model.LabelValue(components[1]) + metadataInIndex = true + return // v3 schema had four components - label name, label value, chunk ID and version. // "version" is 1 and label value is base64 encoded. case bytes.Equal(components[3], chunkTimeRangeKeyV1): - labelValue, err := decodeBase64Value(components[1]) - return string(components[2]), labelValue, false, err + chunkID = string(components[2]) + labelValue, err = decodeBase64Value(components[1]) + return // v4 schema wrote v3 range keys and a new range key - version 2, // with four components - , , chunk ID and version. case bytes.Equal(components[3], chunkTimeRangeKeyV2): - return string(components[2]), model.LabelValue(""), false, nil + chunkID = string(components[2]) + return // v5 schema version 3 range key is chunk end time, , chunk ID, version case bytes.Equal(components[3], chunkTimeRangeKeyV3): - return string(components[2]), model.LabelValue(""), false, nil + chunkID = string(components[2]) + return // v5 schema version 4 range key is chunk end time, label value, chunk ID, version case bytes.Equal(components[3], chunkTimeRangeKeyV4): - labelValue, err := decodeBase64Value(components[1]) - return string(components[2]), labelValue, false, err + chunkID = string(components[2]) + labelValue, err = decodeBase64Value(components[1]) + return // v6 schema added version 5 range keys, which have the label value written in // to the value, not the range key. So they are [chunk end time, , chunk ID, version]. case bytes.Equal(components[3], chunkTimeRangeKeyV5): - labelValue := model.LabelValue(value) - return string(components[2]), labelValue, false, nil + chunkID = string(components[2]) + labelValue = model.LabelValue(value) + return + + // v9 schema actually return series IDs + case bytes.Equal(components[3], seriesRangeKeyV1): + chunkID = string(components[0]) + isSeriesID = true + return + + case bytes.Equal(components[3], labelSeriesRangeKeyV1): + chunkID = string(components[1]) + labelValue = model.LabelValue(value) + isSeriesID = true + return default: - return "", model.LabelValue(""), false, fmt.Errorf("unrecognised chunkTimeRangeKey version: '%v'", string(components[3])) + err = fmt.Errorf("unrecognised chunkTimeRangeKey version: '%v'", string(components[3])) + return } } diff --git a/pkg/chunk/schema_util_test.go b/pkg/chunk/schema_util_test.go index cd308bbb941..c112967c017 100644 --- a/pkg/chunk/schema_util_test.go +++ b/pkg/chunk/schema_util_test.go @@ -99,7 +99,7 @@ func TestParseChunkTimeRangeValue(t *testing.T) { {[]byte("a1b2c3d4\x00Y29kZQ\x002:1484661279394:1484664879394\x004\x00"), "code", "2:1484661279394:1484664879394"}, } { - chunkID, labelValue, _, err := parseChunkTimeRangeValue(c.encoded, nil) + chunkID, labelValue, _, _, err := parseChunkTimeRangeValue(c.encoded, nil) require.NoError(t, err) assert.Equal(t, model.LabelValue(c.value), labelValue) assert.Equal(t, c.chunkID, chunkID) diff --git a/pkg/chunk/series_store.go b/pkg/chunk/series_store.go new file mode 100644 index 00000000000..54b845740d3 --- /dev/null +++ b/pkg/chunk/series_store.go @@ -0,0 +1,246 @@ +package chunk + +import ( + "context" + "errors" + "fmt" + "time" + + "github.com/go-kit/kit/log/level" + "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/pkg/labels" + + "github.com/weaveworks/common/user" + "github.com/weaveworks/cortex/pkg/util" + "github.com/weaveworks/cortex/pkg/util/extract" +) + +var ( + errCardinalityExceeded = errors.New("cardinality limit exceeded") +) + +// seriesStore implements Store +type seriesStore struct { + store + cardinalityCache *heapCache +} + +func newSeriesStore(cfg StoreConfig, schema Schema, storage StorageClient) (Store, error) { + fetcher, err := newChunkFetcher(cfg.CacheConfig, storage) + if err != nil { + return nil, err + } + + return &seriesStore{ + store: store{ + cfg: cfg, + storage: storage, + schema: schema, + chunkFetcher: fetcher, + }, + cardinalityCache: newHeapCache(cfg.CardinalityCacheSize), + }, nil +} + +// Get implements Store +func (c *seriesStore) Get(ctx context.Context, from, through model.Time, allMatchers ...*labels.Matcher) ([]Chunk, error) { + log, ctx := newSpanLogger(ctx, "ChunkStore.Get") + defer log.Span.Finish() + level.Debug(log).Log("from", from, "through", through, "matchers", len(allMatchers)) + + // Validate the query is within reasonable bounds. + shortcut, err := c.validateQuery(ctx, from, &through) + if err != nil { + return nil, err + } else if shortcut { + return nil, nil + } + + // Ensure this query includes a metric name. + metricNameMatcher, allMatchers, ok := extract.MetricNameMatcherFromMatchers(allMatchers) + if !ok || metricNameMatcher.Type != labels.MatchEqual { + return nil, fmt.Errorf("query must contain metric name") + } + level.Debug(log).Log("metric", metricNameMatcher.Value) + + // Fetch the series IDs from the index, based on non-empty matchers from + // the query. + _, matchers := util.SplitFiltersAndMatchers(allMatchers) + seriesIDs, err := c.lookupSeriesByMetricNameMatchers(ctx, from, through, metricNameMatcher.Value, matchers) + if err != nil { + return nil, err + } + level.Debug(log).Log("Series IDs", len(seriesIDs)) + + // Lookup the series in the index to get the chunks. + chunkIDs, err := c.lookupChunksBySeries(ctx, from, through, seriesIDs) + if err != nil { + return nil, err + } + level.Debug(log).Log("Chunk IDs", len(chunkIDs)) + + // Protect ourselves against OOMing. + if len(chunkIDs) > c.cfg.QueryChunkLimit { + err := fmt.Errorf("Query %v fetched too many chunks (%d > %d)", allMatchers, len(chunkIDs), c.cfg.QueryChunkLimit) + level.Error(log).Log("err", err) + return nil, err + } + + // Filter out chunks that are not in the selected time range. + chunks, err := c.convertChunkIDsToChunks(ctx, chunkIDs) + if err != nil { + return nil, err + } + filtered, keys := filterChunksByTime(from, through, chunks) + level.Debug(log).Log("Chunks post filtering", len(chunks)) + + // Now fetch the actual chunk data from Memcache / S3 + allChunks, err := c.fetchChunks(ctx, filtered, keys) + if err != nil { + level.Error(log).Log("err", err) + return nil, err + } + + // Filter out chunks based on the empty matchers in the query. + filteredChunks := filterChunksByMatchers(allChunks, allMatchers) + return filteredChunks, nil +} + +func (c *seriesStore) lookupSeriesByMetricNameMatchers(ctx context.Context, from, through model.Time, metricName string, matchers []*labels.Matcher) ([]string, error) { + log, ctx := newSpanLogger(ctx, "ChunkStore.lookupSeriesByMetricNameMatchers", "metricName", metricName, "matchers", len(matchers)) + log.Finish() + + // Just get series for metric if there are no matchers + if len(matchers) == 0 { + return c.lookupSeriesByMetricNameMatcher(ctx, from, through, metricName, nil) + } + + // Otherwise get series which include other matchers + incomingIDs := make(chan []string) + incomingErrors := make(chan error) + for _, matcher := range matchers { + go func(matcher *labels.Matcher) { + ids, err := c.lookupSeriesByMetricNameMatcher(ctx, from, through, metricName, matcher) + if err != nil { + incomingErrors <- err + return + } + incomingIDs <- ids + }(matcher) + } + + // Receive chunkSets from all matchers + var ids []string + var lastErr error + var cardinalityExceededErrors int + for i := 0; i < len(matchers); i++ { + select { + case incoming := <-incomingIDs: + if ids == nil { + ids = incoming + } else { + ids = intersectStrings(ids, incoming) + } + case err := <-incomingErrors: + if err == errCardinalityExceeded { + cardinalityExceededErrors++ + } else { + lastErr = err + } + } + } + if cardinalityExceededErrors == len(matchers) { + return nil, errCardinalityExceeded + } else if lastErr != nil { + return nil, lastErr + } + + level.Debug(log).Log("msg", "post intersection", "ids", len(ids)) + return ids, nil +} + +func (c *seriesStore) lookupSeriesByMetricNameMatcher(ctx context.Context, from, through model.Time, metricName string, matcher *labels.Matcher) ([]string, error) { + log, ctx := newSpanLogger(ctx, "ChunkStore.lookupSeriesByMetricNameMatcher", "metricName", metricName, "matcher", matcher) + defer log.Span.Finish() + + userID, err := user.ExtractOrgID(ctx) + if err != nil { + return nil, err + } + + var queries []IndexQuery + if matcher == nil { + queries, err = c.schema.GetReadQueriesForMetric(from, through, userID, model.LabelValue(metricName)) + } else if matcher.Type != labels.MatchEqual { + queries, err = c.schema.GetReadQueriesForMetricLabel(from, through, userID, model.LabelValue(metricName), model.LabelName(matcher.Name)) + } else { + queries, err = c.schema.GetReadQueriesForMetricLabelValue(from, through, userID, model.LabelValue(metricName), model.LabelName(matcher.Name), model.LabelValue(matcher.Value)) + } + if err != nil { + return nil, err + } + level.Debug(log).Log("queries", len(queries)) + + for _, query := range queries { + value, updated, ok := c.cardinalityCache.get(query.HashValue) + if !ok { + continue + } + entryAge := time.Now().Sub(updated) + cardinality := value.(int) + if entryAge < c.cfg.CardinalityCacheValidity && cardinality > c.cfg.CardinalityLimit { + return nil, errCardinalityExceeded + } + } + + entries, err := c.lookupEntriesByQueries(ctx, queries) + if err != nil { + return nil, err + } + level.Debug(log).Log("entries", len(entries)) + + // TODO This is not correct, will overcount for queries > 24hrs + for _, query := range queries { + c.cardinalityCache.put(query.HashValue, len(entries)) + } + if len(entries) > c.cfg.CardinalityLimit { + return nil, errCardinalityExceeded + } + + ids, err := c.parseIndexEntries(ctx, entries, matcher) + if err != nil { + return nil, err + } + level.Debug(log).Log("ids", len(ids)) + + return ids, nil +} + +func (c *seriesStore) lookupChunksBySeries(ctx context.Context, from, through model.Time, seriesIDs []string) ([]string, error) { + log, ctx := newSpanLogger(ctx, "ChunkStore.lookupChunksBySeries") + + userID, err := user.ExtractOrgID(ctx) + if err != nil { + return nil, err + } + level.Debug(log).Log("seriesIDs", len(seriesIDs)) + + queries := make([]IndexQuery, 0, len(seriesIDs)) + for _, seriesID := range seriesIDs { + qs, err := c.schema.GetChunksForSeries(from, through, userID, []byte(seriesID)) + if err != nil { + return nil, err + } + queries = append(queries, qs...) + } + level.Debug(log).Log("queries", len(queries)) + + entries, err := c.lookupEntriesByQueries(ctx, queries) + if err != nil { + return nil, err + } + level.Debug(log).Log("entries", len(entries)) + + result, err := c.parseIndexEntries(ctx, entries, nil) + return result, err +}