Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 4 additions & 69 deletions pkg/chunk/chunk_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,13 +178,12 @@ func (c *store) Get(ctx context.Context, from, through model.Time, allMatchers .

// Fetch metric name chunks if the matcher is of type equal,
metricNameMatcher, matchers, ok := extract.MetricNameMatcherFromMatchers(allMatchers)
if ok && metricNameMatcher.Type == labels.MatchEqual {
log.Span.SetTag("metric", metricNameMatcher.Value)
return c.getMetricNameChunks(ctx, from, through, matchers, metricNameMatcher.Value)
if !ok && metricNameMatcher.Type != labels.MatchEqual {
return nil, fmt.Errorf("query must contain metric name")
}

// Otherwise we consult the metric name index first and then create queries for each matching metric name.
return c.getSeriesChunks(ctx, from, through, matchers, metricNameMatcher)
log.Span.SetTag("metric", metricNameMatcher.Value)
return c.getMetricNameChunks(ctx, from, through, matchers, metricNameMatcher.Value)
}

func (c *store) validateQuery(ctx context.Context, from model.Time, through *model.Time) (shortcut bool, err error) {
Expand Down Expand Up @@ -253,70 +252,6 @@ func (c *store) getMetricNameChunks(ctx context.Context, from, through model.Tim
return filteredChunks, nil
}

func (c *store) getSeriesChunks(ctx context.Context, from, through model.Time, allMatchers []*labels.Matcher, metricNameMatcher *labels.Matcher) ([]Chunk, error) {
// Get all series from the index
userID, err := user.ExtractOrgID(ctx)
if err != nil {
return nil, err
}
seriesQueries, err := c.schema.GetReadQueries(from, through, userID)
if err != nil {
return nil, err
}
seriesEntries, err := c.lookupEntriesByQueries(ctx, seriesQueries)
if err != nil {
return nil, err
}

chunks := make([]Chunk, 0, len(seriesEntries))
outer:
for _, seriesEntry := range seriesEntries {
metric, err := parseSeriesRangeValue(seriesEntry.RangeValue, seriesEntry.Value)
if err != nil {
return nil, err
}

// Apply metric name matcher
if metricNameMatcher != nil && !metricNameMatcher.Matches(string(metric[model.LabelName(metricNameMatcher.Name)])) {
continue outer
}

// Apply matchers
for _, matcher := range allMatchers {
if !matcher.Matches(string(metric[model.LabelName(matcher.Name)])) {
continue outer
}
}

var matchers []*labels.Matcher
for labelName, labelValue := range metric {
if labelName == "__name__" {
continue
}

matcher, err := labels.NewMatcher(labels.MatchEqual, string(labelName), string(labelValue))
if err != nil {
return nil, err
}
matchers = append(matchers, matcher)
}

cs, err := c.getMetricNameChunks(ctx, from, through, matchers, string(metric[model.MetricNameLabel]))
if err != nil {
return nil, err
}

for _, chunk := range cs {
// getMetricNameChunks() may have selected too many metrics - metrics that match all matchers,
// but also have additional labels. We don't want to return those.
if chunk.Metric.Equal(metric) {
chunks = append(chunks, chunk)
}
}
}
return chunks, nil
}

func (c *store) lookupChunksByMetricName(ctx context.Context, from, through model.Time, matchers []*labels.Matcher, metricName string) ([]Chunk, error) {
log, ctx := newSpanLogger(ctx, "ChunkStore.lookupChunksByMetricName")
defer log.Finish()
Expand Down
2 changes: 0 additions & 2 deletions pkg/chunk/chunk_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,6 @@ var schemas = []struct {
{"v4 schema", v4Schema, newStore, true},
{"v5 schema", v5Schema, newStore, true},
{"v6 schema", v6Schema, newStore, true},
{"v7 schema", v7Schema, newStore, true},
{"v8 schema", v8Schema, newStore, false},
{"v9 schema", v9Schema, newSeriesStore, true},
}

Expand Down
18 changes: 0 additions & 18 deletions pkg/chunk/composite_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,24 +93,6 @@ func SchemaOpts(cfg StoreConfig, schemaCfg SchemaConfig) []SchemaOpt {
})
}

if schemaCfg.V7SchemaFrom.IsSet() {
opts = append(opts, SchemaOpt{
From: schemaCfg.V7SchemaFrom.Time,
NewStore: func(storage StorageClient) (Store, error) {
return newStore(cfg, v7Schema(schemaCfg), storage)
},
})
}

if schemaCfg.V8SchemaFrom.IsSet() {
opts = append(opts, SchemaOpt{
From: schemaCfg.V8SchemaFrom.Time,
NewStore: func(storage StorageClient) (Store, error) {
return newStore(cfg, v8Schema(schemaCfg), storage)
},
})
}

if schemaCfg.V9SchemaFrom.IsSet() {
opts = append(opts, SchemaOpt{
From: schemaCfg.V9SchemaFrom.Time,
Expand Down
143 changes: 6 additions & 137 deletions pkg/chunk/schema.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,11 @@
package chunk

import (
"crypto/sha1"
"encoding/json"
"errors"
"fmt"
"strings"

"github.com/prometheus/common/model"
"github.com/weaveworks/cortex/pkg/util/extract"
)

var (
Expand All @@ -22,11 +19,9 @@ var (
// For v9 schema
seriesRangeKeyV1 = []byte{'7'}
labelSeriesRangeKeyV1 = []byte{'8'}
)

// Errors
var (
ErrNoMetricNameNotSupported = errors.New("metric name required for pre-v8 schemas")
// ErrNotSupported when a schema doesn't support that particular lookup.
ErrNotSupported = errors.New("not supported")
)

// Schema interface defines methods to calculate the hash and range keys needed
Expand All @@ -36,7 +31,6 @@ type Schema interface {
GetWriteEntries(from, through model.Time, userID string, metricName model.LabelValue, labels model.Metric, chunkID string) ([]IndexEntry, error)

// When doing a read, use these methods to return the list of entries you should query
GetReadQueries(from, through model.Time, userID string) ([]IndexQuery, error)
GetReadQueriesForMetric(from, through model.Time, userID string, metricName model.LabelValue) ([]IndexQuery, error)
GetReadQueriesForMetricLabel(from, through model.Time, userID string, metricName model.LabelValue, labelName model.LabelName) ([]IndexQuery, error)
GetReadQueriesForMetricLabelValue(from, through model.Time, userID string, metricName model.LabelValue, labelName model.LabelName, labelValue model.LabelValue) ([]IndexQuery, error)
Expand Down Expand Up @@ -132,22 +126,6 @@ func v6Schema(cfg SchemaConfig) Schema {
}
}

// DEPRECATED: v7 schema is an extension of v6, with support for queries with no metric names, but is broken
func v7Schema(cfg SchemaConfig) Schema {
return schema{
cfg.dailyBuckets,
v7Entries{},
}
}

// DEPRECATED: v8 schema is an extension of v6, with support for a labelset/series index, but is too slow in practice
func v8Schema(cfg SchemaConfig) Schema {
return schema{
cfg.dailyBuckets,
v8Entries{},
}
}

// v9 schema index series, not chunks.
func v9Schema(cfg SchemaConfig) Schema {
return schema{
Expand Down Expand Up @@ -175,20 +153,6 @@ func (s schema) GetWriteEntries(from, through model.Time, userID string, metricN
return result, nil
}

func (s schema) GetReadQueries(from, through model.Time, userID string) ([]IndexQuery, error) {
var result []IndexQuery

buckets := s.buckets(from, through, userID)
for _, bucket := range buckets {
entries, err := s.entries.GetReadQueries(bucket)
if err != nil {
return nil, err
}
result = append(result, entries...)
}
return result, nil
}

func (s schema) GetReadQueriesForMetric(from, through model.Time, userID string, metricName model.LabelValue) ([]IndexQuery, error) {
var result []IndexQuery

Expand Down Expand Up @@ -247,7 +211,6 @@ func (s schema) GetChunksForSeries(from, through model.Time, userID string, seri

type entries interface {
GetWriteEntries(bucket Bucket, metricName model.LabelValue, labels model.Metric, chunkID string) ([]IndexEntry, error)
GetReadQueries(bucket Bucket) ([]IndexQuery, error)
GetReadMetricQueries(bucket Bucket, metricName model.LabelValue) ([]IndexQuery, error)
GetReadMetricLabelQueries(bucket Bucket, metricName model.LabelValue, labelName model.LabelName) ([]IndexQuery, error)
GetReadMetricLabelValueQueries(bucket Bucket, metricName model.LabelValue, labelName model.LabelName, labelValue model.LabelValue) ([]IndexQuery, error)
Expand Down Expand Up @@ -275,10 +238,6 @@ func (originalEntries) GetWriteEntries(bucket Bucket, metricName model.LabelValu
return result, nil
}

func (originalEntries) GetReadQueries(_ Bucket) ([]IndexQuery, error) {
return nil, ErrNoMetricNameNotSupported
}

func (originalEntries) GetReadMetricQueries(bucket Bucket, metricName model.LabelValue) ([]IndexQuery, error) {
return []IndexQuery{
{
Expand Down Expand Up @@ -313,7 +272,7 @@ func (originalEntries) GetReadMetricLabelValueQueries(bucket Bucket, metricName
}

func (originalEntries) GetChunksForSeries(_ Bucket, _ []byte) ([]IndexQuery, error) {
return nil, ErrNoMetricNameNotSupported
return nil, ErrNotSupported
}

type base64Entries struct {
Expand All @@ -338,10 +297,6 @@ func (base64Entries) GetWriteEntries(bucket Bucket, metricName model.LabelValue,
return result, nil
}

func (base64Entries) GetReadQueries(_ Bucket) ([]IndexQuery, error) {
return nil, ErrNoMetricNameNotSupported
}

func (base64Entries) GetReadMetricLabelValueQueries(bucket Bucket, metricName model.LabelValue, labelName model.LabelName, labelValue model.LabelValue) ([]IndexQuery, error) {
encodedBytes := encodeBase64Value(labelValue)
return []IndexQuery{
Expand Down Expand Up @@ -380,10 +335,6 @@ func (labelNameInHashKeyEntries) GetWriteEntries(bucket Bucket, metricName model
return entries, nil
}

func (labelNameInHashKeyEntries) GetReadQueries(_ Bucket) ([]IndexQuery, error) {
return nil, ErrNoMetricNameNotSupported
}

func (labelNameInHashKeyEntries) GetReadMetricQueries(bucket Bucket, metricName model.LabelValue) ([]IndexQuery, error) {
return []IndexQuery{
{
Expand Down Expand Up @@ -414,7 +365,7 @@ func (labelNameInHashKeyEntries) GetReadMetricLabelValueQueries(bucket Bucket, m
}

func (labelNameInHashKeyEntries) GetChunksForSeries(_ Bucket, _ []byte) ([]IndexQuery, error) {
return nil, ErrNoMetricNameNotSupported
return nil, ErrNotSupported
}

// v5Entries includes chunk end time in range key - see #298.
Expand Down Expand Up @@ -447,10 +398,6 @@ func (v5Entries) GetWriteEntries(bucket Bucket, metricName model.LabelValue, lab
return entries, nil
}

func (v5Entries) GetReadQueries(_ Bucket) ([]IndexQuery, error) {
return nil, ErrNoMetricNameNotSupported
}

func (v5Entries) GetReadMetricQueries(bucket Bucket, metricName model.LabelValue) ([]IndexQuery, error) {
return []IndexQuery{
{
Expand Down Expand Up @@ -479,7 +426,7 @@ func (v5Entries) GetReadMetricLabelValueQueries(bucket Bucket, metricName model.
}

func (v5Entries) GetChunksForSeries(_ Bucket, _ []byte) ([]IndexQuery, error) {
return nil, ErrNoMetricNameNotSupported
return nil, ErrNotSupported
}

// v6Entries fixes issues with v5 time encoding being wrong (see #337), and
Expand Down Expand Up @@ -513,10 +460,6 @@ func (v6Entries) GetWriteEntries(bucket Bucket, metricName model.LabelValue, lab
return entries, nil
}

func (v6Entries) GetReadQueries(bucket Bucket) ([]IndexQuery, error) {
return nil, ErrNoMetricNameNotSupported
}

func (v6Entries) GetReadMetricQueries(bucket Bucket, metricName model.LabelValue) ([]IndexQuery, error) {
encodedFromBytes := encodeTime(bucket.from)
return []IndexQuery{
Expand Down Expand Up @@ -552,77 +495,7 @@ func (v6Entries) GetReadMetricLabelValueQueries(bucket Bucket, metricName model.
}

func (v6Entries) GetChunksForSeries(_ Bucket, _ []byte) ([]IndexQuery, error) {
return nil, ErrNoMetricNameNotSupported
}

// v7Entries is a deprecated scherma initially used to support queries with no metric name. Use v8Entries instead.
type v7Entries struct {
v6Entries
}

func (entries v7Entries) GetWriteEntries(bucket Bucket, metricName model.LabelValue, labels model.Metric, chunkID string) ([]IndexEntry, error) {
indexEntries, err := entries.v6Entries.GetWriteEntries(bucket, metricName, labels, chunkID)
if err != nil {
return nil, err
}

metricName, err = extract.MetricNameFromMetric(labels)
if err != nil {
return nil, err
}
metricNameHashBytes := sha1.Sum([]byte(metricName))

// Add IndexEntry for metric name with userID:bigBucket HashValue
indexEntries = append(indexEntries, IndexEntry{
TableName: bucket.tableName,
HashValue: bucket.hashKey,
RangeValue: encodeRangeKey(encodeBase64Bytes(metricNameHashBytes[:]), nil, nil, metricNameRangeKeyV1),
Value: []byte(metricName),
})

return indexEntries, nil
}

func (v7Entries) GetReadQueries(bucket Bucket) ([]IndexQuery, error) {
// Replaced with v8Schema series index
return nil, ErrNoMetricNameNotSupported
}

// v8Entries supports queries with no metric name by using a series index.
type v8Entries struct {
v6Entries
}

func (entries v8Entries) GetWriteEntries(bucket Bucket, metricName model.LabelValue, labels model.Metric, chunkID string) ([]IndexEntry, error) {
indexEntries, err := entries.v6Entries.GetWriteEntries(bucket, metricName, labels, chunkID)
if err != nil {
return nil, err
}

seriesID := metricSeriesID(labels)
seriesBytes, err := json.Marshal(labels)
if err != nil {
return nil, err
}

// Add IndexEntry for series with userID:bigBucket HashValue
indexEntries = append(indexEntries, IndexEntry{
TableName: bucket.tableName,
HashValue: bucket.hashKey,
RangeValue: encodeRangeKey([]byte(seriesID), nil, nil, seriesRangeKeyV1),
Value: seriesBytes,
})

return indexEntries, nil
}

func (v8Entries) GetReadQueries(bucket Bucket) ([]IndexQuery, error) {
return []IndexQuery{
{
TableName: bucket.tableName,
HashValue: bucket.hashKey,
},
}, nil
return nil, ErrNotSupported
}

// v9Entries adds a layer of indirection between labels -> series -> chunks.
Expand Down Expand Up @@ -666,10 +539,6 @@ func (v9Entries) GetWriteEntries(bucket Bucket, metricName model.LabelValue, lab
return entries, nil
}

func (v9Entries) GetReadQueries(bucket Bucket) ([]IndexQuery, error) {
return nil, ErrNoMetricNameNotSupported
}

func (v9Entries) GetReadMetricQueries(bucket Bucket, metricName model.LabelValue) ([]IndexQuery, error) {
return []IndexQuery{
{
Expand Down
Loading