diff --git a/pkg/chunk/chunk_store.go b/pkg/chunk/chunk_store.go index 3edfeb9cdc0..c8abb23441c 100644 --- a/pkg/chunk/chunk_store.go +++ b/pkg/chunk/chunk_store.go @@ -178,13 +178,12 @@ func (c *store) Get(ctx context.Context, from, through model.Time, allMatchers . // Fetch metric name chunks if the matcher is of type equal, metricNameMatcher, matchers, ok := extract.MetricNameMatcherFromMatchers(allMatchers) - if ok && metricNameMatcher.Type == labels.MatchEqual { - log.Span.SetTag("metric", metricNameMatcher.Value) - return c.getMetricNameChunks(ctx, from, through, matchers, metricNameMatcher.Value) + if !ok && metricNameMatcher.Type != labels.MatchEqual { + return nil, fmt.Errorf("query must contain metric name") } - // Otherwise we consult the metric name index first and then create queries for each matching metric name. - return c.getSeriesChunks(ctx, from, through, matchers, metricNameMatcher) + log.Span.SetTag("metric", metricNameMatcher.Value) + return c.getMetricNameChunks(ctx, from, through, matchers, metricNameMatcher.Value) } func (c *store) validateQuery(ctx context.Context, from model.Time, through *model.Time) (shortcut bool, err error) { @@ -253,70 +252,6 @@ func (c *store) getMetricNameChunks(ctx context.Context, from, through model.Tim return filteredChunks, nil } -func (c *store) getSeriesChunks(ctx context.Context, from, through model.Time, allMatchers []*labels.Matcher, metricNameMatcher *labels.Matcher) ([]Chunk, error) { - // Get all series from the index - userID, err := user.ExtractOrgID(ctx) - if err != nil { - return nil, err - } - seriesQueries, err := c.schema.GetReadQueries(from, through, userID) - if err != nil { - return nil, err - } - seriesEntries, err := c.lookupEntriesByQueries(ctx, seriesQueries) - if err != nil { - return nil, err - } - - chunks := make([]Chunk, 0, len(seriesEntries)) -outer: - for _, seriesEntry := range seriesEntries { - metric, err := parseSeriesRangeValue(seriesEntry.RangeValue, seriesEntry.Value) - if err != nil { - return nil, err - } - - // Apply metric name matcher - if metricNameMatcher != nil && !metricNameMatcher.Matches(string(metric[model.LabelName(metricNameMatcher.Name)])) { - continue outer - } - - // Apply matchers - for _, matcher := range allMatchers { - if !matcher.Matches(string(metric[model.LabelName(matcher.Name)])) { - continue outer - } - } - - var matchers []*labels.Matcher - for labelName, labelValue := range metric { - if labelName == "__name__" { - continue - } - - matcher, err := labels.NewMatcher(labels.MatchEqual, string(labelName), string(labelValue)) - if err != nil { - return nil, err - } - matchers = append(matchers, matcher) - } - - cs, err := c.getMetricNameChunks(ctx, from, through, matchers, string(metric[model.MetricNameLabel])) - if err != nil { - return nil, err - } - - for _, chunk := range cs { - // getMetricNameChunks() may have selected too many metrics - metrics that match all matchers, - // but also have additional labels. We don't want to return those. - if chunk.Metric.Equal(metric) { - chunks = append(chunks, chunk) - } - } - } - return chunks, nil -} - func (c *store) lookupChunksByMetricName(ctx context.Context, from, through model.Time, matchers []*labels.Matcher, metricName string) ([]Chunk, error) { log, ctx := newSpanLogger(ctx, "ChunkStore.lookupChunksByMetricName") defer log.Finish() diff --git a/pkg/chunk/chunk_store_test.go b/pkg/chunk/chunk_store_test.go index 5171aacfc61..62e7e632920 100644 --- a/pkg/chunk/chunk_store_test.go +++ b/pkg/chunk/chunk_store_test.go @@ -37,8 +37,6 @@ var schemas = []struct { {"v4 schema", v4Schema, newStore, true}, {"v5 schema", v5Schema, newStore, true}, {"v6 schema", v6Schema, newStore, true}, - {"v7 schema", v7Schema, newStore, true}, - {"v8 schema", v8Schema, newStore, false}, {"v9 schema", v9Schema, newSeriesStore, true}, } diff --git a/pkg/chunk/composite_store.go b/pkg/chunk/composite_store.go index ebbd2f38c0e..5c5423c601a 100644 --- a/pkg/chunk/composite_store.go +++ b/pkg/chunk/composite_store.go @@ -93,24 +93,6 @@ func SchemaOpts(cfg StoreConfig, schemaCfg SchemaConfig) []SchemaOpt { }) } - if schemaCfg.V7SchemaFrom.IsSet() { - opts = append(opts, SchemaOpt{ - From: schemaCfg.V7SchemaFrom.Time, - NewStore: func(storage StorageClient) (Store, error) { - return newStore(cfg, v7Schema(schemaCfg), storage) - }, - }) - } - - if schemaCfg.V8SchemaFrom.IsSet() { - opts = append(opts, SchemaOpt{ - From: schemaCfg.V8SchemaFrom.Time, - NewStore: func(storage StorageClient) (Store, error) { - return newStore(cfg, v8Schema(schemaCfg), storage) - }, - }) - } - if schemaCfg.V9SchemaFrom.IsSet() { opts = append(opts, SchemaOpt{ From: schemaCfg.V9SchemaFrom.Time, diff --git a/pkg/chunk/schema.go b/pkg/chunk/schema.go index acb292bc96f..00d09e9f893 100644 --- a/pkg/chunk/schema.go +++ b/pkg/chunk/schema.go @@ -1,14 +1,11 @@ package chunk import ( - "crypto/sha1" - "encoding/json" "errors" "fmt" "strings" "github.com/prometheus/common/model" - "github.com/weaveworks/cortex/pkg/util/extract" ) var ( @@ -22,11 +19,9 @@ var ( // For v9 schema seriesRangeKeyV1 = []byte{'7'} labelSeriesRangeKeyV1 = []byte{'8'} -) -// Errors -var ( - ErrNoMetricNameNotSupported = errors.New("metric name required for pre-v8 schemas") + // ErrNotSupported when a schema doesn't support that particular lookup. + ErrNotSupported = errors.New("not supported") ) // Schema interface defines methods to calculate the hash and range keys needed @@ -36,7 +31,6 @@ type Schema interface { GetWriteEntries(from, through model.Time, userID string, metricName model.LabelValue, labels model.Metric, chunkID string) ([]IndexEntry, error) // When doing a read, use these methods to return the list of entries you should query - GetReadQueries(from, through model.Time, userID string) ([]IndexQuery, error) GetReadQueriesForMetric(from, through model.Time, userID string, metricName model.LabelValue) ([]IndexQuery, error) GetReadQueriesForMetricLabel(from, through model.Time, userID string, metricName model.LabelValue, labelName model.LabelName) ([]IndexQuery, error) GetReadQueriesForMetricLabelValue(from, through model.Time, userID string, metricName model.LabelValue, labelName model.LabelName, labelValue model.LabelValue) ([]IndexQuery, error) @@ -132,22 +126,6 @@ func v6Schema(cfg SchemaConfig) Schema { } } -// DEPRECATED: v7 schema is an extension of v6, with support for queries with no metric names, but is broken -func v7Schema(cfg SchemaConfig) Schema { - return schema{ - cfg.dailyBuckets, - v7Entries{}, - } -} - -// DEPRECATED: v8 schema is an extension of v6, with support for a labelset/series index, but is too slow in practice -func v8Schema(cfg SchemaConfig) Schema { - return schema{ - cfg.dailyBuckets, - v8Entries{}, - } -} - // v9 schema index series, not chunks. func v9Schema(cfg SchemaConfig) Schema { return schema{ @@ -175,20 +153,6 @@ func (s schema) GetWriteEntries(from, through model.Time, userID string, metricN return result, nil } -func (s schema) GetReadQueries(from, through model.Time, userID string) ([]IndexQuery, error) { - var result []IndexQuery - - buckets := s.buckets(from, through, userID) - for _, bucket := range buckets { - entries, err := s.entries.GetReadQueries(bucket) - if err != nil { - return nil, err - } - result = append(result, entries...) - } - return result, nil -} - func (s schema) GetReadQueriesForMetric(from, through model.Time, userID string, metricName model.LabelValue) ([]IndexQuery, error) { var result []IndexQuery @@ -247,7 +211,6 @@ func (s schema) GetChunksForSeries(from, through model.Time, userID string, seri type entries interface { GetWriteEntries(bucket Bucket, metricName model.LabelValue, labels model.Metric, chunkID string) ([]IndexEntry, error) - GetReadQueries(bucket Bucket) ([]IndexQuery, error) GetReadMetricQueries(bucket Bucket, metricName model.LabelValue) ([]IndexQuery, error) GetReadMetricLabelQueries(bucket Bucket, metricName model.LabelValue, labelName model.LabelName) ([]IndexQuery, error) GetReadMetricLabelValueQueries(bucket Bucket, metricName model.LabelValue, labelName model.LabelName, labelValue model.LabelValue) ([]IndexQuery, error) @@ -275,10 +238,6 @@ func (originalEntries) GetWriteEntries(bucket Bucket, metricName model.LabelValu return result, nil } -func (originalEntries) GetReadQueries(_ Bucket) ([]IndexQuery, error) { - return nil, ErrNoMetricNameNotSupported -} - func (originalEntries) GetReadMetricQueries(bucket Bucket, metricName model.LabelValue) ([]IndexQuery, error) { return []IndexQuery{ { @@ -313,7 +272,7 @@ func (originalEntries) GetReadMetricLabelValueQueries(bucket Bucket, metricName } func (originalEntries) GetChunksForSeries(_ Bucket, _ []byte) ([]IndexQuery, error) { - return nil, ErrNoMetricNameNotSupported + return nil, ErrNotSupported } type base64Entries struct { @@ -338,10 +297,6 @@ func (base64Entries) GetWriteEntries(bucket Bucket, metricName model.LabelValue, return result, nil } -func (base64Entries) GetReadQueries(_ Bucket) ([]IndexQuery, error) { - return nil, ErrNoMetricNameNotSupported -} - func (base64Entries) GetReadMetricLabelValueQueries(bucket Bucket, metricName model.LabelValue, labelName model.LabelName, labelValue model.LabelValue) ([]IndexQuery, error) { encodedBytes := encodeBase64Value(labelValue) return []IndexQuery{ @@ -380,10 +335,6 @@ func (labelNameInHashKeyEntries) GetWriteEntries(bucket Bucket, metricName model return entries, nil } -func (labelNameInHashKeyEntries) GetReadQueries(_ Bucket) ([]IndexQuery, error) { - return nil, ErrNoMetricNameNotSupported -} - func (labelNameInHashKeyEntries) GetReadMetricQueries(bucket Bucket, metricName model.LabelValue) ([]IndexQuery, error) { return []IndexQuery{ { @@ -414,7 +365,7 @@ func (labelNameInHashKeyEntries) GetReadMetricLabelValueQueries(bucket Bucket, m } func (labelNameInHashKeyEntries) GetChunksForSeries(_ Bucket, _ []byte) ([]IndexQuery, error) { - return nil, ErrNoMetricNameNotSupported + return nil, ErrNotSupported } // v5Entries includes chunk end time in range key - see #298. @@ -447,10 +398,6 @@ func (v5Entries) GetWriteEntries(bucket Bucket, metricName model.LabelValue, lab return entries, nil } -func (v5Entries) GetReadQueries(_ Bucket) ([]IndexQuery, error) { - return nil, ErrNoMetricNameNotSupported -} - func (v5Entries) GetReadMetricQueries(bucket Bucket, metricName model.LabelValue) ([]IndexQuery, error) { return []IndexQuery{ { @@ -479,7 +426,7 @@ func (v5Entries) GetReadMetricLabelValueQueries(bucket Bucket, metricName model. } func (v5Entries) GetChunksForSeries(_ Bucket, _ []byte) ([]IndexQuery, error) { - return nil, ErrNoMetricNameNotSupported + return nil, ErrNotSupported } // v6Entries fixes issues with v5 time encoding being wrong (see #337), and @@ -513,10 +460,6 @@ func (v6Entries) GetWriteEntries(bucket Bucket, metricName model.LabelValue, lab return entries, nil } -func (v6Entries) GetReadQueries(bucket Bucket) ([]IndexQuery, error) { - return nil, ErrNoMetricNameNotSupported -} - func (v6Entries) GetReadMetricQueries(bucket Bucket, metricName model.LabelValue) ([]IndexQuery, error) { encodedFromBytes := encodeTime(bucket.from) return []IndexQuery{ @@ -552,77 +495,7 @@ func (v6Entries) GetReadMetricLabelValueQueries(bucket Bucket, metricName model. } func (v6Entries) GetChunksForSeries(_ Bucket, _ []byte) ([]IndexQuery, error) { - return nil, ErrNoMetricNameNotSupported -} - -// v7Entries is a deprecated scherma initially used to support queries with no metric name. Use v8Entries instead. -type v7Entries struct { - v6Entries -} - -func (entries v7Entries) GetWriteEntries(bucket Bucket, metricName model.LabelValue, labels model.Metric, chunkID string) ([]IndexEntry, error) { - indexEntries, err := entries.v6Entries.GetWriteEntries(bucket, metricName, labels, chunkID) - if err != nil { - return nil, err - } - - metricName, err = extract.MetricNameFromMetric(labels) - if err != nil { - return nil, err - } - metricNameHashBytes := sha1.Sum([]byte(metricName)) - - // Add IndexEntry for metric name with userID:bigBucket HashValue - indexEntries = append(indexEntries, IndexEntry{ - TableName: bucket.tableName, - HashValue: bucket.hashKey, - RangeValue: encodeRangeKey(encodeBase64Bytes(metricNameHashBytes[:]), nil, nil, metricNameRangeKeyV1), - Value: []byte(metricName), - }) - - return indexEntries, nil -} - -func (v7Entries) GetReadQueries(bucket Bucket) ([]IndexQuery, error) { - // Replaced with v8Schema series index - return nil, ErrNoMetricNameNotSupported -} - -// v8Entries supports queries with no metric name by using a series index. -type v8Entries struct { - v6Entries -} - -func (entries v8Entries) GetWriteEntries(bucket Bucket, metricName model.LabelValue, labels model.Metric, chunkID string) ([]IndexEntry, error) { - indexEntries, err := entries.v6Entries.GetWriteEntries(bucket, metricName, labels, chunkID) - if err != nil { - return nil, err - } - - seriesID := metricSeriesID(labels) - seriesBytes, err := json.Marshal(labels) - if err != nil { - return nil, err - } - - // Add IndexEntry for series with userID:bigBucket HashValue - indexEntries = append(indexEntries, IndexEntry{ - TableName: bucket.tableName, - HashValue: bucket.hashKey, - RangeValue: encodeRangeKey([]byte(seriesID), nil, nil, seriesRangeKeyV1), - Value: seriesBytes, - }) - - return indexEntries, nil -} - -func (v8Entries) GetReadQueries(bucket Bucket) ([]IndexQuery, error) { - return []IndexQuery{ - { - TableName: bucket.tableName, - HashValue: bucket.hashKey, - }, - }, nil + return nil, ErrNotSupported } // v9Entries adds a layer of indirection between labels -> series -> chunks. @@ -666,10 +539,6 @@ func (v9Entries) GetWriteEntries(bucket Bucket, metricName model.LabelValue, lab return entries, nil } -func (v9Entries) GetReadQueries(bucket Bucket) ([]IndexQuery, error) { - return nil, ErrNoMetricNameNotSupported -} - func (v9Entries) GetReadMetricQueries(bucket Bucket, metricName model.LabelValue) ([]IndexQuery, error) { return []IndexQuery{ { diff --git a/pkg/chunk/schema_config.go b/pkg/chunk/schema_config.go index 0cf2993cb12..4ee6baa85b7 100644 --- a/pkg/chunk/schema_config.go +++ b/pkg/chunk/schema_config.go @@ -28,8 +28,6 @@ type SchemaConfig struct { V4SchemaFrom util.DayValue V5SchemaFrom util.DayValue V6SchemaFrom util.DayValue - V7SchemaFrom util.DayValue - V8SchemaFrom util.DayValue V9SchemaFrom util.DayValue BigtableColumnKeyFrom util.DayValue @@ -59,8 +57,6 @@ func (cfg *SchemaConfig) RegisterFlags(f *flag.FlagSet) { f.Var(&cfg.V4SchemaFrom, "dynamodb.v4-schema-from", "The date (in the format YYYY-MM-DD) after which we enable v4 schema.") f.Var(&cfg.V5SchemaFrom, "dynamodb.v5-schema-from", "The date (in the format YYYY-MM-DD) after which we enable v5 schema.") f.Var(&cfg.V6SchemaFrom, "dynamodb.v6-schema-from", "The date (in the format YYYY-MM-DD) after which we enable v6 schema.") - f.Var(&cfg.V7SchemaFrom, "dynamodb.v7-schema-from", "The date (in the format YYYY-MM-DD) after which we enable v7 schema (Deprecated).") - f.Var(&cfg.V8SchemaFrom, "dynamodb.v8-schema-from", "The date (in the format YYYY-MM-DD) after which we enable v8 schema (Deprecated).") f.Var(&cfg.V9SchemaFrom, "dynamodb.v9-schema-from", "The date (in the format YYYY-MM-DD) after which we enable v9 schema (Series indexing).") f.Var(&cfg.BigtableColumnKeyFrom, "bigtable.column-key-from", "The date (in the format YYYY-MM-DD) after which we use bigtable column keys.") diff --git a/pkg/chunk/schema_test.go b/pkg/chunk/schema_test.go index e99ad66d286..81e36c961e8 100644 --- a/pkg/chunk/schema_test.go +++ b/pkg/chunk/schema_test.go @@ -2,9 +2,7 @@ package chunk import ( "bytes" - "crypto/sha1" "encoding/base64" - "encoding/json" "fmt" "reflect" "sort" @@ -195,20 +193,13 @@ func TestSchemaRangeKey(t *testing.T) { labelBuckets = v4Schema(cfg) tsRangeKeys = v5Schema(cfg) v6RangeKeys = v6Schema(cfg) - v7RangeKeys = v7Schema(cfg) - v8RangeKeys = v8Schema(cfg) metric = model.Metric{ model.MetricNameLabel: metricName, "bar": "bary", "baz": "bazy", } - fooSha1Hash = sha1.Sum([]byte("foo")) ) - seriesID := metricSeriesID(metric) - metricBytes, err := json.Marshal(metric) - require.NoError(t, err) - mkEntries := func(hashKey string, callback func(labelName model.LabelName, labelValue model.LabelValue) ([]byte, []byte)) []IndexEntry { result := []IndexEntry{} for labelName, labelValue := range metric { @@ -312,62 +303,6 @@ func TestSchemaRangeKey(t *testing.T) { }, }, }, - { - v7RangeKeys, - []IndexEntry{ - { - TableName: table, - HashValue: "userid:d0", - RangeValue: append(encodeBase64Bytes(fooSha1Hash[:]), []byte("\x00\x00\x006\x00")...), - Value: []byte("foo"), - }, - { - TableName: table, - HashValue: "userid:d0:foo", - RangeValue: []byte("0036ee7f\x00\x00chunkID\x003\x00"), - }, - { - TableName: table, - HashValue: "userid:d0:foo:bar", - RangeValue: []byte("0036ee7f\x00\x00chunkID\x005\x00"), - Value: []byte("bary"), - }, - { - TableName: table, - HashValue: "userid:d0:foo:baz", - RangeValue: []byte("0036ee7f\x00\x00chunkID\x005\x00"), - Value: []byte("bazy"), - }, - }, - }, - { - v8RangeKeys, - []IndexEntry{ - { - TableName: table, - HashValue: "userid:d0", - RangeValue: append([]byte(seriesID), []byte("\x00\x00\x007\x00")...), - Value: metricBytes, - }, - { - TableName: table, - HashValue: "userid:d0:foo", - RangeValue: []byte("0036ee7f\x00\x00chunkID\x003\x00"), - }, - { - TableName: table, - HashValue: "userid:d0:foo:bar", - RangeValue: []byte("0036ee7f\x00\x00chunkID\x005\x00"), - Value: []byte("bary"), - }, - { - TableName: table, - HashValue: "userid:d0:foo:baz", - RangeValue: []byte("0036ee7f\x00\x00chunkID\x005\x00"), - Value: []byte("bazy"), - }, - }, - }, } { t.Run(fmt.Sprintf("TestSchameRangeKey[%d]", i), func(t *testing.T) { have, err := tc.Schema.GetWriteEntries(