diff --git a/Gopkg.lock b/Gopkg.lock index 2f07af9fe40..5f11a7a54ee 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -680,7 +680,7 @@ "util/treecache", "web/api/v1" ] - revision = "25e2d9f152b634d65123ef11342aceb23e66a1a2" + revision = "504acf4a0aec394fa7993dc2fe5744ef59f97b2c" [[projects]] branch = "master" diff --git a/cmd/lite/main.go b/cmd/lite/main.go index 7d9fac51999..096ee7f25d7 100644 --- a/cmd/lite/main.go +++ b/cmd/lite/main.go @@ -11,7 +11,6 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/route" "github.com/prometheus/prometheus/config" - "github.com/prometheus/prometheus/promql" "github.com/prometheus/prometheus/web/api/v1" "github.com/prometheus/tsdb" "google.golang.org/grpc" @@ -124,8 +123,7 @@ func main() { tableManager.Start() defer tableManager.Stop() - engine := promql.NewEngine(util.Logger, nil, querierConfig.MaxConcurrent, querierConfig.Timeout) - queryable := querier.NewQueryable(dist, chunkStore) + queryable, engine := querier.Make(querierConfig, dist, chunkStore) if configStoreConfig.ConfigsAPIURL.String() != "" || configStoreConfig.DBConfig.URI != "" { rulesAPI, err := ruler.NewRulesAPI(configStoreConfig) @@ -186,7 +184,7 @@ func main() { subrouter := server.HTTP.PathPrefix("/api/prom").Subrouter() subrouter.PathPrefix("/api/v1").Handler(activeMiddleware.Wrap(promRouter)) - subrouter.Path("/read").Handler(activeMiddleware.Wrap(http.HandlerFunc(queryable.RemoteReadHandler))) + subrouter.Path("/read").Handler(activeMiddleware.Wrap(querier.RemoteReadHandler(queryable))) subrouter.Path("/validate_expr").Handler(activeMiddleware.Wrap(http.HandlerFunc(dist.ValidateExprHandler))) subrouter.Path("/user_stats").Handler(activeMiddleware.Wrap(http.HandlerFunc(dist.UserStatsHandler))) diff --git a/cmd/querier/main.go b/cmd/querier/main.go index 943399c26ac..9429746a22b 100644 --- a/cmd/querier/main.go +++ b/cmd/querier/main.go @@ -11,7 +11,6 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/route" "github.com/prometheus/prometheus/config" - "github.com/prometheus/prometheus/promql" "github.com/prometheus/prometheus/web/api/v1" "github.com/prometheus/tsdb" @@ -88,9 +87,7 @@ func main() { } defer chunkStore.Stop() - queryable := querier.NewQueryable(dist, chunkStore) - - engine := promql.NewEngine(util.Logger, nil, querierConfig.MaxConcurrent, querierConfig.Timeout) + queryable, engine := querier.Make(querierConfig, dist, chunkStore) api := v1.NewAPI( engine, queryable, @@ -107,7 +104,7 @@ func main() { subrouter := server.HTTP.PathPrefix("/api/prom").Subrouter() subrouter.PathPrefix("/api/v1").Handler(middleware.AuthenticateUser.Wrap(promRouter)) - subrouter.Path("/read").Handler(middleware.AuthenticateUser.Wrap(http.HandlerFunc(queryable.RemoteReadHandler))) + subrouter.Path("/read").Handler(middleware.AuthenticateUser.Wrap(querier.RemoteReadHandler(queryable))) subrouter.Path("/validate_expr").Handler(middleware.AuthenticateUser.Wrap(http.HandlerFunc(dist.ValidateExprHandler))) subrouter.Path("/user_stats").Handler(middleware.AuthenticateUser.Wrap(http.HandlerFunc(dist.UserStatsHandler))) diff --git a/cmd/ruler/main.go b/cmd/ruler/main.go index 4601d23725f..f480e4c8c22 100644 --- a/cmd/ruler/main.go +++ b/cmd/ruler/main.go @@ -36,6 +36,7 @@ func main() { schemaConfig chunk.SchemaConfig storageConfig storage.Config configStoreConfig ruler.ConfigStoreConfig + querierConfig querier.Config logLevel util.LogLevel ) @@ -44,7 +45,8 @@ func main() { defer trace.Close() util.RegisterFlags(&serverConfig, &ringConfig, &distributorConfig, - &rulerConfig, &chunkStoreConfig, &storageConfig, &schemaConfig, &configStoreConfig, &logLevel) + &rulerConfig, &chunkStoreConfig, &storageConfig, &schemaConfig, &configStoreConfig, + &querierConfig, &logLevel) flag.Parse() util.InitLogger(logLevel.AllowedLevel) @@ -78,7 +80,7 @@ func main() { prometheus.MustRegister(dist) engine := promql.NewEngine(util.Logger, prometheus.DefaultRegisterer, rulerConfig.NumWorkers, rulerConfig.GroupTimeout) - queryable := querier.NewQueryable(dist, chunkStore) + queryable := querier.NewQueryable(dist, chunkStore, querierConfig.Iterators) rlr, err := ruler.NewRuler(rulerConfig, engine, queryable, dist) if err != nil { diff --git a/pkg/chunk/chunk.go b/pkg/chunk/chunk.go index 87343017a8a..7bd15c8b711 100644 --- a/pkg/chunk/chunk.go +++ b/pkg/chunk/chunk.go @@ -319,36 +319,31 @@ func equalByKey(a, b Chunk) bool { a.From == b.From && a.Through == b.Through && a.Checksum == b.Checksum } -func chunksToMatrix(ctx context.Context, chunks []Chunk, from, through model.Time) (model.Matrix, error) { +// ChunksToMatrix converts a set of chunks to a model.Matrix. +func ChunksToMatrix(ctx context.Context, chunks []Chunk, from, through model.Time) (model.Matrix, error) { sp, ctx := ot.StartSpanFromContext(ctx, "chunksToMatrix") defer sp.Finish() sp.LogFields(otlog.Int("chunks", len(chunks))) // Group chunks by series, sort and dedupe samples. - sampleStreams := map[model.Fingerprint]*model.SampleStream{} + metrics := map[model.Fingerprint]model.Metric{} + samplesBySeries := map[model.Fingerprint][][]model.SamplePair{} for _, c := range chunks { - ss, ok := sampleStreams[c.Fingerprint] - if !ok { - ss = &model.SampleStream{ - Metric: c.Metric, - } - sampleStreams[c.Fingerprint] = ss - } - - samples, err := c.Samples(from, through) + ss, err := c.Samples(from, through) if err != nil { return nil, err } - ss.Values = util.MergeSampleSets(ss.Values, samples) + metrics[c.Fingerprint] = c.Metric + samplesBySeries[c.Fingerprint] = append(samplesBySeries[c.Fingerprint], ss) } - sp.LogFields(otlog.Int("sample streams", len(sampleStreams))) + sp.LogFields(otlog.Int("series", len(samplesBySeries))) - matrix := make(model.Matrix, 0, len(sampleStreams)) - for _, ss := range sampleStreams { + matrix := make(model.Matrix, 0, len(samplesBySeries)) + for fp, ss := range samplesBySeries { matrix = append(matrix, &model.SampleStream{ - Metric: ss.Metric, - Values: ss.Values, + Metric: metrics[fp], + Values: util.MergeNSampleSets(ss...), }) } diff --git a/pkg/chunk/chunk_store.go b/pkg/chunk/chunk_store.go index 84bf658b3ea..b7d59755328 100644 --- a/pkg/chunk/chunk_store.go +++ b/pkg/chunk/chunk_store.go @@ -195,7 +195,7 @@ func (s *spanLogger) Log(kvps ...interface{}) error { } // Get implements ChunkStore -func (c *Store) Get(ctx context.Context, from, through model.Time, allMatchers ...*labels.Matcher) (model.Matrix, error) { +func (c *Store) Get(ctx context.Context, from, through model.Time, allMatchers ...*labels.Matcher) ([]Chunk, error) { log, ctx := newSpanLogger(ctx, "ChunkStore.Get") defer log.Span.Finish() @@ -227,19 +227,11 @@ func (c *Store) Get(ctx context.Context, from, through model.Time, allMatchers . metricNameMatcher, matchers, ok := extract.MetricNameMatcherFromMatchers(allMatchers) if ok && metricNameMatcher.Type == labels.MatchEqual { log.Span.SetTag("metric", metricNameMatcher.Value) - return c.getMetricNameMatrix(ctx, from, through, matchers, metricNameMatcher.Value) + return c.getMetricNameChunks(ctx, from, through, matchers, metricNameMatcher.Value) } // Otherwise we consult the metric name index first and then create queries for each matching metric name. - return c.getSeriesMatrix(ctx, from, through, matchers, metricNameMatcher) -} - -func (c *Store) getMetricNameMatrix(ctx context.Context, from, through model.Time, allMatchers []*labels.Matcher, metricName string) (model.Matrix, error) { - chunks, err := c.getMetricNameChunks(ctx, from, through, allMatchers, metricName) - if err != nil { - return nil, err - } - return chunksToMatrix(ctx, chunks, from, through) + return c.getSeriesChunks(ctx, from, through, matchers, metricNameMatcher) } func (c *Store) getMetricNameChunks(ctx context.Context, from, through model.Time, allMatchers []*labels.Matcher, metricName string) ([]Chunk, error) { @@ -345,7 +337,7 @@ func ProcessCacheResponse(chunks []Chunk, keys []string, bufs [][]byte) (found [ return } -func (c *Store) getSeriesMatrix(ctx context.Context, from, through model.Time, allMatchers []*labels.Matcher, metricNameMatcher *labels.Matcher) (model.Matrix, error) { +func (c *Store) getSeriesChunks(ctx context.Context, from, through model.Time, allMatchers []*labels.Matcher, metricNameMatcher *labels.Matcher) ([]Chunk, error) { // Get all series from the index userID, err := user.ExtractOrgID(ctx) if err != nil { @@ -406,7 +398,7 @@ outer: } } } - return chunksToMatrix(ctx, chunks, from, through) + return chunks, nil } func (c *Store) lookupChunksByMetricName(ctx context.Context, from, through model.Time, matchers []*labels.Matcher, metricName string) ([]Chunk, error) { diff --git a/pkg/chunk/chunk_store_test.go b/pkg/chunk/chunk_store_test.go index b47a92d1347..9bf523de0b3 100644 --- a/pkg/chunk/chunk_store_test.go +++ b/pkg/chunk/chunk_store_test.go @@ -218,7 +218,10 @@ func TestChunkStore_Get(t *testing.T) { } // Query with ordinary time-range - matrix1, err := store.Get(ctx, now.Add(-time.Hour), now, matchers...) + chunks1, err := store.Get(ctx, now.Add(-time.Hour), now, matchers...) + require.NoError(t, err) + + matrix1, err := ChunksToMatrix(ctx, chunks1, now.Add(-time.Hour), now) require.NoError(t, err) sort.Sort(ByFingerprint(matrix1)) @@ -229,7 +232,10 @@ func TestChunkStore_Get(t *testing.T) { } // Pushing end of time-range into future should yield exact same resultset - matrix2, err := store.Get(ctx, now.Add(-time.Hour), now.Add(time.Hour*24*30), matchers...) + chunks2, err := store.Get(ctx, now.Add(-time.Hour), now.Add(time.Hour*24*30), matchers...) + require.NoError(t, err) + + matrix2, err := ChunksToMatrix(ctx, chunks2, now.Add(-time.Hour), now) require.NoError(t, err) sort.Sort(ByFingerprint(matrix2)) diff --git a/pkg/chunk/chunk_test.go b/pkg/chunk/chunk_test.go index 040d01c5110..b486565dfc8 100644 --- a/pkg/chunk/chunk_test.go +++ b/pkg/chunk/chunk_test.go @@ -188,7 +188,7 @@ func TestChunksToMatrix(t *testing.T) { }, }, } { - matrix, err := chunksToMatrix(context.Background(), c.chunks, chunk1.From, chunk3.Through) + matrix, err := ChunksToMatrix(context.Background(), c.chunks, chunk1.From, chunk3.Through) require.NoError(t, err) sort.Sort(matrix) diff --git a/pkg/ingester/ingester_test.go b/pkg/ingester/ingester_test.go index 835ab052cd0..8fb57e80183 100644 --- a/pkg/ingester/ingester_test.go +++ b/pkg/ingester/ingester_test.go @@ -3,6 +3,7 @@ package ingester import ( "context" "fmt" + "math" "net/http" "sort" "sync" @@ -18,7 +19,6 @@ import ( "github.com/weaveworks/common/user" "github.com/weaveworks/cortex/pkg/chunk" "github.com/weaveworks/cortex/pkg/ingester/client" - "github.com/weaveworks/cortex/pkg/util" ) type testStore struct { @@ -86,36 +86,6 @@ func matrixToSamples(m model.Matrix) []model.Sample { return samples } -// chunksToMatrix converts a slice of chunks into a model.Matrix. -func chunksToMatrix(chunks []chunk.Chunk) (model.Matrix, error) { - // Group chunks by series, sort and dedupe samples. - sampleStreams := map[model.Fingerprint]*model.SampleStream{} - for _, c := range chunks { - fp := c.Metric.Fingerprint() - ss, ok := sampleStreams[fp] - if !ok { - ss = &model.SampleStream{ - Metric: c.Metric, - } - sampleStreams[fp] = ss - } - - samples, err := c.Samples(c.From, c.Through) - if err != nil { - return nil, err - } - - ss.Values = util.MergeSampleSets(ss.Values, samples) - } - - matrix := make(model.Matrix, 0, len(sampleStreams)) - for _, ss := range sampleStreams { - matrix = append(matrix, ss) - } - - return matrix, nil -} - func TestIngesterAppend(t *testing.T) { store, ing := newTestStore(t, defaultIngesterTestConfig()) @@ -154,7 +124,7 @@ func TestIngesterAppend(t *testing.T) { // Read samples back via chunk store. ing.Shutdown() for _, userID := range userIDs { - res, err := chunksToMatrix(store.chunks[userID]) + res, err := chunk.ChunksToMatrix(context.Background(), store.chunks[userID], model.Time(0), model.Time(math.MaxInt64)) require.NoError(t, err) sort.Sort(res) assert.Equal(t, testData[userID], res) diff --git a/pkg/ingester/lifecycle_test.go b/pkg/ingester/lifecycle_test.go index bce5b745501..1de9249a80f 100644 --- a/pkg/ingester/lifecycle_test.go +++ b/pkg/ingester/lifecycle_test.go @@ -2,6 +2,7 @@ package ingester import ( "io" + "math" "reflect" "runtime" "testing" @@ -17,6 +18,7 @@ import ( "github.com/prometheus/prometheus/pkg/labels" "github.com/weaveworks/common/user" + "github.com/weaveworks/cortex/pkg/chunk" "github.com/weaveworks/cortex/pkg/ingester/client" "github.com/weaveworks/cortex/pkg/ring" "github.com/weaveworks/cortex/pkg/util" @@ -314,7 +316,7 @@ func TestIngesterFlush(t *testing.T) { }) // And check the store has the chunk - res, err := chunksToMatrix(store.chunks[userID]) + res, err := chunk.ChunksToMatrix(context.Background(), store.chunks[userID], model.Time(0), model.Time(math.MaxInt64)) require.NoError(t, err) assert.Equal(t, model.Matrix{ &model.SampleStream{ diff --git a/pkg/querier/benchmark_test.go b/pkg/querier/benchmark_test.go new file mode 100644 index 00000000000..a674db7eb91 --- /dev/null +++ b/pkg/querier/benchmark_test.go @@ -0,0 +1,27 @@ +package querier + +import ( + "fmt" + "testing" + + "github.com/prometheus/prometheus/promql" +) + +var result *promql.Result + +func BenchmarkChunkQueryable(b *testing.B) { + for _, encoding := range encodings { + store, from := makeMockChunkStore(b, 24*30, encoding.e) + + for _, q := range queryables { + b.Run(fmt.Sprintf("%s/%s", q.name, encoding.name), func(b *testing.B) { + queryable := q.f(store) + var r *promql.Result + for n := 0; n < b.N; n++ { + r = testQuery(b, queryable, from) + } + result = r + }) + } + } +} diff --git a/pkg/querier/chunk_iterator.go b/pkg/querier/chunk_iterator.go new file mode 100644 index 00000000000..5a57a080c01 --- /dev/null +++ b/pkg/querier/chunk_iterator.go @@ -0,0 +1,59 @@ +package querier + +import ( + "github.com/prometheus/common/model" + "github.com/weaveworks/cortex/pkg/chunk" + promchunk "github.com/weaveworks/cortex/pkg/prom1/storage/local/chunk" +) + +type chunkIterator struct { + chunk.Chunk + it promchunk.Iterator + + // At() is called often in the heap code, so caching its result seems like + // a good idea. + cacheValid bool + cachedTime int64 + cachedValue float64 +} + +// Seek advances the iterator forward to the value at or after +// the given timestamp. +func (i *chunkIterator) Seek(t int64) bool { + i.cacheValid = false + + // We assume seeks only care about a specific window; if this chunk doesn't + // contain samples in that window, we can shortcut. + if int64(i.Through) < t { + return false + } + + return i.it.FindAtOrAfter(model.Time(t)) +} + +func (i *chunkIterator) AtTime() int64 { + if !i.cacheValid { + v := i.it.Value() + i.cachedTime, i.cachedValue = int64(v.Timestamp), float64(v.Value) + i.cacheValid = true + } + return i.cachedTime +} + +func (i *chunkIterator) At() (int64, float64) { + if !i.cacheValid { + v := i.it.Value() + i.cachedTime, i.cachedValue = int64(v.Timestamp), float64(v.Value) + i.cacheValid = true + } + return i.cachedTime, i.cachedValue +} + +func (i *chunkIterator) Next() bool { + i.cacheValid = false + return i.it.Scan() +} + +func (i *chunkIterator) Err() error { + return i.it.Err() +} diff --git a/pkg/querier/chunk_merge_iterator.go b/pkg/querier/chunk_merge_iterator.go new file mode 100644 index 00000000000..7bd679fd7de --- /dev/null +++ b/pkg/querier/chunk_merge_iterator.go @@ -0,0 +1,155 @@ +package querier + +import ( + "container/heap" + "sort" + "time" + + "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/storage" + "github.com/weaveworks/cortex/pkg/chunk" +) + +// Limit on the window size of seeks. +const window = 24 * time.Hour +const chunkSize = 12 * time.Hour + +type chunkMergeIterator struct { + chunks []*chunkIterator + h seriesIteratorHeap + + curr *chunkIterator + lastErr error +} + +func newChunkMergeIterator(cs []chunk.Chunk) storage.SeriesIterator { + chunks := make([]*chunkIterator, len(cs), len(cs)) + for i := range cs { + chunks[i] = &chunkIterator{ + Chunk: cs[i], + it: cs[i].Data.NewIterator(), + } + } + sort.Sort(byFrom(chunks)) + + c := &chunkMergeIterator{ + chunks: chunks, + h: make(seriesIteratorHeap, 0, len(chunks)), + } + + for _, iter := range c.chunks { + if iter.Next() { + heap.Push(&c.h, iter) + } else if err := iter.Err(); err != nil { + c.lastErr = err + } + } + return c +} + +func (c *chunkMergeIterator) findChunks(t int64) []*chunkIterator { + // Find beginning and end index into list of chunks. + i := sort.Search(len(c.chunks), func(i int) bool { + return c.chunks[i].From.Add(chunkSize) >= model.Time(t) + }) + j := sort.Search(len(c.chunks), func(i int) bool { + return model.Time(t).Add(window) <= c.chunks[i].From + }) + return c.chunks[i:j] +} + +func (c *chunkMergeIterator) Seek(t int64) bool { + chunks := c.findChunks(t) + c.curr = nil + c.h = c.h[:0] + + for _, iter := range chunks { + if iter.Seek(t) { + heap.Push(&c.h, iter) + } else if err := iter.Err(); err != nil { + c.lastErr = err + return false + } + } + + return c.popAndDedupe() +} + +func (c *chunkMergeIterator) Next() bool { + if c.curr != nil { + if c.curr.Next() { + heap.Push(&c.h, c.curr) + } else if err := c.curr.Err(); err != nil { + c.lastErr = err + return false + } + c.curr = nil + } + + return c.popAndDedupe() +} + +func (c *chunkMergeIterator) popAndDedupe() bool { + if len(c.h) == 0 { + return false + } + + c.curr = heap.Pop(&c.h).(*chunkIterator) + for len(c.h) > 0 { + next := c.h[0] + if next.AtTime() != c.curr.AtTime() { + break + } + + if next.Next() { + heap.Fix(&c.h, 0) + continue + } + + heap.Pop(&c.h) + if err := next.Err(); err != nil { + c.lastErr = err + return false + } + } + return true +} + +func (c *chunkMergeIterator) At() (t int64, v float64) { + if c.curr == nil { + panic("mergeIterator.At() called after .Next() returned false.") + } + + return c.curr.At() +} + +func (c *chunkMergeIterator) Err() error { + return c.lastErr +} + +type seriesIteratorHeap []*chunkIterator + +func (h seriesIteratorHeap) Len() int { return len(h) } +func (h seriesIteratorHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] } + +func (h seriesIteratorHeap) Less(i, j int) bool { + return h[i].AtTime() < h[j].AtTime() +} + +func (h *seriesIteratorHeap) Push(x interface{}) { + *h = append(*h, x.(*chunkIterator)) +} + +func (h *seriesIteratorHeap) Pop() interface{} { + old := *h + n := len(old) + x := old[n-1] + *h = old[0 : n-1] + return x +} + +type byFrom []*chunkIterator + +func (b byFrom) Len() int { return len(b) } +func (b byFrom) Swap(i, j int) { b[i], b[j] = b[j], b[i] } +func (b byFrom) Less(i, j int) bool { return b[i].From < b[j].From } diff --git a/pkg/querier/chunk_merge_iterator_test.go b/pkg/querier/chunk_merge_iterator_test.go new file mode 100644 index 00000000000..ff22fcf7d37 --- /dev/null +++ b/pkg/querier/chunk_merge_iterator_test.go @@ -0,0 +1,89 @@ +package querier + +import ( + "strconv" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/weaveworks/cortex/pkg/chunk" + promchunk "github.com/weaveworks/cortex/pkg/prom1/storage/local/chunk" +) + +func TestChunkMergeIterator(t *testing.T) { + for i, tc := range []struct { + chunks []chunk.Chunk + mint, maxt int64 + }{ + { + chunks: []chunk.Chunk{ + mkChunk(t, 0, 100, 1*time.Millisecond, promchunk.Varbit), + }, + maxt: 100, + }, + + { + chunks: []chunk.Chunk{ + mkChunk(t, 0, 100, 1*time.Millisecond, promchunk.Varbit), + mkChunk(t, 0, 100, 1*time.Millisecond, promchunk.Varbit), + }, + maxt: 100, + }, + + { + chunks: []chunk.Chunk{ + mkChunk(t, 0, 100, 1*time.Millisecond, promchunk.Varbit), + mkChunk(t, 50, 150, 1*time.Millisecond, promchunk.Varbit), + mkChunk(t, 100, 200, 1*time.Millisecond, promchunk.Varbit), + }, + maxt: 200, + }, + + { + chunks: []chunk.Chunk{ + mkChunk(t, 0, 100, 1*time.Millisecond, promchunk.Varbit), + mkChunk(t, 100, 200, 1*time.Millisecond, promchunk.Varbit), + }, + maxt: 200, + }, + } { + t.Run(strconv.Itoa(i), func(t *testing.T) { + iter := newChunkMergeIterator(tc.chunks) + for i := tc.mint; i < tc.maxt; i++ { + require.True(t, iter.Next()) + ts, s := iter.At() + assert.Equal(t, i, ts) + assert.Equal(t, float64(i), s) + assert.NoError(t, iter.Err()) + } + assert.False(t, iter.Next()) + }) + } +} + +func TestChunkMergeIteratorSeek(t *testing.T) { + iter := newChunkMergeIterator([]chunk.Chunk{ + mkChunk(t, 0, 100, 1*time.Millisecond, promchunk.Varbit), + mkChunk(t, 50, 150, 1*time.Millisecond, promchunk.Varbit), + mkChunk(t, 100, 200, 1*time.Millisecond, promchunk.Varbit), + }) + + for i := int64(0); i < 10; i += 20 { + require.True(t, iter.Seek(i)) + ts, s := iter.At() + assert.Equal(t, i, ts) + assert.Equal(t, float64(i), s) + assert.NoError(t, iter.Err()) + + for j := i + 1; j < 200; j++ { + require.True(t, iter.Next()) + ts, s := iter.At() + assert.Equal(t, j, ts) + assert.Equal(t, float64(j), s) + assert.NoError(t, iter.Err()) + } + assert.False(t, iter.Next()) + } +} diff --git a/pkg/querier/chunk_queryable.go b/pkg/querier/chunk_queryable.go new file mode 100644 index 00000000000..e1138e58ca8 --- /dev/null +++ b/pkg/querier/chunk_queryable.go @@ -0,0 +1,106 @@ +package querier + +import ( + "context" + + "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/pkg/labels" + "github.com/prometheus/prometheus/storage" + "github.com/weaveworks/cortex/pkg/chunk" +) + +// ChunkStore is the read-interface to the Chunk Store. Made an interface here +// to reduce package coupling. +type ChunkStore interface { + Get(ctx context.Context, from, through model.Time, matchers ...*labels.Matcher) ([]chunk.Chunk, error) +} + +// NewQueryable creates a new Queryable for cortex. +func NewQueryable(distributor Distributor, chunkStore ChunkStore, iterators bool) storage.Queryable { + dq := newDistributorQueryable(distributor) + cq := newChunkQueryable(chunkStore) + if iterators { + cq = newIterChunkQueryable(chunkStore) + } + + return storage.QueryableFunc(func(ctx context.Context, mint, maxt int64) (storage.Querier, error) { + dqr, err := dq.Querier(ctx, mint, maxt) + if err != nil { + return nil, err + } + + cqr, err := cq.Querier(ctx, mint, maxt) + if err != nil { + return nil, err + } + + return querier{ + Querier: storage.NewMergeQuerier([]storage.Querier{dqr, cqr}), + distributor: distributor, + ctx: ctx, + mint: mint, + maxt: maxt, + }, nil + }) +} + +type querier struct { + storage.Querier + + distributor Distributor + ctx context.Context + mint, maxt int64 +} + +func (q querier) Select(sp *storage.SelectParams, matchers ...*labels.Matcher) (storage.SeriesSet, error) { + // Kludge: Prometheus passes nil SelectParams if it is doing a 'series' operation, + // which needs only metadata. + if sp != nil { + return q.Querier.Select(sp, matchers...) + } + + ms, err := q.distributor.MetricsForLabelMatchers(q.ctx, model.Time(q.mint), model.Time(q.maxt), matchers...) + if err != nil { + return nil, err + } + return metricsToSeriesSet(ms), nil +} + +func newChunkQueryable(store ChunkStore) storage.Queryable { + return storage.QueryableFunc(func(ctx context.Context, mint, maxt int64) (storage.Querier, error) { + return &chunkQuerier{ + store: store, + ctx: ctx, + mint: mint, + maxt: maxt, + }, nil + }) +} + +type chunkQuerier struct { + store ChunkStore + ctx context.Context + mint, maxt int64 +} + +func (c chunkQuerier) Select(_ *storage.SelectParams, matchers ...*labels.Matcher) (storage.SeriesSet, error) { + chunks, err := c.store.Get(c.ctx, model.Time(c.mint), model.Time(c.maxt), matchers...) + if err != nil { + return nil, err + } + + matrix, err := chunk.ChunksToMatrix(c.ctx, chunks, model.Time(c.mint), model.Time(c.maxt)) + if err != nil { + return nil, err + } + + return matrixToSeriesSet(matrix), nil +} + +func (c chunkQuerier) LabelValues(name string) ([]string, error) { + return nil, nil +} + +func (c chunkQuerier) Close() error { + return nil +} diff --git a/pkg/querier/chunk_queryable_iter.go b/pkg/querier/chunk_queryable_iter.go new file mode 100644 index 00000000000..b23f36f2f1e --- /dev/null +++ b/pkg/querier/chunk_queryable_iter.go @@ -0,0 +1,74 @@ +package querier + +import ( + "context" + + "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/pkg/labels" + "github.com/prometheus/prometheus/promql" + "github.com/prometheus/prometheus/storage" + + "github.com/weaveworks/cortex/pkg/chunk" +) + +func newIterChunkQueryable(store ChunkStore) storage.Queryable { + return storage.QueryableFunc(func(ctx context.Context, mint, maxt int64) (storage.Querier, error) { + return &iterChunkQuerier{ + store: store, + ctx: ctx, + mint: mint, + maxt: maxt, + }, nil + }) +} + +type iterChunkQuerier struct { + store ChunkStore + ctx context.Context + mint, maxt int64 +} + +func (q *iterChunkQuerier) Select(_ *storage.SelectParams, matchers ...*labels.Matcher) (storage.SeriesSet, error) { + chunks, err := q.store.Get(q.ctx, model.Time(q.mint), model.Time(q.maxt), matchers...) + if err != nil { + return nil, promql.ErrStorage(err) + } + + chunksBySeries := map[model.Fingerprint][]chunk.Chunk{} + for _, c := range chunks { + fp := c.Metric.Fingerprint() + chunksBySeries[fp] = append(chunksBySeries[fp], c) + } + + series := make([]storage.Series, 0, len(chunksBySeries)) + for i := range chunksBySeries { + series = append(series, &chunkSeries{ + labels: metricToLabels(chunksBySeries[i][0].Metric), + chunks: chunksBySeries[i], + }) + } + + return newConcreteSeriesSet(series), nil +} + +func (q *iterChunkQuerier) LabelValues(name string) ([]string, error) { + return nil, nil +} + +func (q *iterChunkQuerier) Close() error { + return nil +} + +type chunkSeries struct { + labels labels.Labels + chunks []chunk.Chunk +} + +func (s *chunkSeries) Labels() labels.Labels { + return s.labels +} + +// Iterator returns a new iterator of the data of the series. +func (s *chunkSeries) Iterator() storage.SeriesIterator { + return newChunkMergeIterator(s.chunks) +} diff --git a/pkg/querier/chunk_queryable_test.go b/pkg/querier/chunk_queryable_test.go new file mode 100644 index 00000000000..b02e3981ac0 --- /dev/null +++ b/pkg/querier/chunk_queryable_test.go @@ -0,0 +1,120 @@ +package querier + +import ( + "context" + "fmt" + "testing" + "time" + + "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/pkg/labels" + "github.com/prometheus/prometheus/promql" + "github.com/prometheus/prometheus/storage" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/weaveworks/cortex/pkg/chunk" + promchunk "github.com/weaveworks/cortex/pkg/prom1/storage/local/chunk" + "github.com/weaveworks/cortex/pkg/util" +) + +const ( + userID = "userID" + fp = 1 + chunkOffset = 6 * time.Minute + chunkLength = 3 * time.Hour + sampleRate = 15 * time.Second + samplesPerChunk = chunkLength / sampleRate +) + +var ( + queryables = []struct { + name string + f func(ChunkStore) storage.Queryable + }{ + {"matrixes", newChunkQueryable}, + {"iterators", newIterChunkQueryable}, + } + + encodings = []struct { + name string + e promchunk.Encoding + }{ + {"DoubleDelta", promchunk.DoubleDelta}, + {"Varbit", promchunk.Varbit}, + } +) + +func TestChunkQueryable(t *testing.T) { + for _, q := range queryables { + for _, encoding := range encodings { + t.Run(fmt.Sprintf("%s/%s", q.name, encoding.name), func(t *testing.T) { + store, from := makeMockChunkStore(t, 24*30, encoding.e) + queryable := q.f(store) + testQuery(t, queryable, from) + }) + } + } +} + +type mockChunkStore struct { + chunks []chunk.Chunk +} + +func (m mockChunkStore) Get(ctx context.Context, from, through model.Time, matchers ...*labels.Matcher) ([]chunk.Chunk, error) { + return m.chunks, nil +} + +func makeMockChunkStore(t require.TestingT, numChunks int, encoding promchunk.Encoding) (ChunkStore, model.Time) { + var ( + chunks = make([]chunk.Chunk, 0, numChunks) + from = model.Time(0) + ) + for i := 0; i < numChunks; i++ { + c := mkChunk(t, from, from.Add(samplesPerChunk*sampleRate), sampleRate, encoding) + chunks = append(chunks, c) + from = from.Add(chunkOffset) + } + return mockChunkStore{chunks}, from +} + +func mkChunk(t require.TestingT, mint, maxt model.Time, step time.Duration, encoding promchunk.Encoding) chunk.Chunk { + metric := model.Metric{ + model.MetricNameLabel: "foo", + } + pc, err := promchunk.NewForEncoding(encoding) + require.NoError(t, err) + for i := mint; i.Before(maxt); i = i.Add(step) { + pcs, err := pc.Add(model.SamplePair{ + Timestamp: i, + Value: model.SampleValue(float64(i)), + }) + require.NoError(t, err) + require.Len(t, pcs, 1) + pc = pcs[0] + } + return chunk.NewChunk(userID, fp, metric, pc, mint, maxt) +} + +func testQuery(t require.TestingT, queryable storage.Queryable, end model.Time) *promql.Result { + from, through, step := time.Unix(0, 0), end.Time(), sampleRate*4 + engine := promql.NewEngine(util.Logger, nil, 10, 1*time.Minute) + query, err := engine.NewRangeQuery(queryable, "rate(foo[1m])", from, through, step) + require.NoError(t, err) + + r := query.Exec(context.Background()) + m, err := r.Matrix() + require.NoError(t, err) + require.Len(t, m, 1) + + series := m[0] + assert.Equal(t, labels.Labels{}, series.Metric) + assert.Equal(t, int(through.Sub(from)/step), len(series.Points)) + ts := int64(step / time.Millisecond) + for _, point := range series.Points { + assert.Equal(t, ts, point.T) + assert.Equal(t, 1000.0, point.V) + ts += int64(step / time.Millisecond) + } + return r +} diff --git a/pkg/querier/config.go b/pkg/querier/config.go new file mode 100644 index 00000000000..f19f784a973 --- /dev/null +++ b/pkg/querier/config.go @@ -0,0 +1,36 @@ +package querier + +import ( + "flag" + "time" + + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/prometheus/promql" + "github.com/prometheus/prometheus/storage" + + "github.com/weaveworks/cortex/pkg/util" +) + +// Config contains the configuration require to create a querier +type Config struct { + MaxConcurrent int + Timeout time.Duration + Iterators bool +} + +// RegisterFlags adds the flags required to config this to the given FlagSet +func (cfg *Config) RegisterFlags(f *flag.FlagSet) { + f.IntVar(&cfg.MaxConcurrent, "querier.max-concurrent", 20, "The maximum number of concurrent queries.") + f.DurationVar(&cfg.Timeout, "querier.timeout", 2*time.Minute, "The timeout for a query.") + if f.Lookup("promql.lookback-delta") == nil { + f.DurationVar(&promql.LookbackDelta, "promql.lookback-delta", promql.LookbackDelta, "Time since the last sample after which a time series is considered stale and ignored by expression evaluations.") + } + f.BoolVar(&cfg.Iterators, "querier.iterators", false, "Use iterators to execute query, as opposed to fully materialising the series in memory.") +} + +// Make builds a queryable and promql engine. +func Make(cfg Config, distributor Distributor, chunkStore ChunkStore) (storage.Queryable, *promql.Engine) { + queryable := NewQueryable(distributor, chunkStore, cfg.Iterators) + engine := promql.NewEngine(util.Logger, prometheus.DefaultRegisterer, cfg.MaxConcurrent, cfg.Timeout) + return queryable, engine +} diff --git a/pkg/querier/distributor_queryable.go b/pkg/querier/distributor_queryable.go new file mode 100644 index 00000000000..5c84c0ba166 --- /dev/null +++ b/pkg/querier/distributor_queryable.go @@ -0,0 +1,63 @@ +package querier + +import ( + "context" + + "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/pkg/labels" + "github.com/prometheus/prometheus/promql" + "github.com/prometheus/prometheus/storage" + + "github.com/weaveworks/cortex/pkg/prom1/storage/metric" +) + +// Distributor is the read interface to the distributor, made an interface here +// to reduce package coupling. +type Distributor interface { + Query(ctx context.Context, from, to model.Time, matchers ...*labels.Matcher) (model.Matrix, error) + LabelValuesForLabelName(context.Context, model.LabelName) (model.LabelValues, error) + MetricsForLabelMatchers(ctx context.Context, from, through model.Time, matchers ...*labels.Matcher) ([]metric.Metric, error) +} + +func newDistributorQueryable(distributor Distributor) storage.Queryable { + return storage.QueryableFunc(func(ctx context.Context, mint, maxt int64) (storage.Querier, error) { + return &distributorQuerier{ + distributor: distributor, + ctx: ctx, + mint: mint, + maxt: maxt, + }, nil + }) +} + +type distributorQuerier struct { + distributor Distributor + ctx context.Context + mint, maxt int64 +} + +func (q *distributorQuerier) Select(_ *storage.SelectParams, matchers ...*labels.Matcher) (storage.SeriesSet, error) { + matrix, err := q.distributor.Query(q.ctx, model.Time(q.mint), model.Time(q.maxt), matchers...) + if err != nil { + return nil, promql.ErrStorage(err) + } + + return matrixToSeriesSet(matrix), nil +} + +func (q *distributorQuerier) LabelValues(name string) ([]string, error) { + values, err := q.distributor.LabelValuesForLabelName(q.ctx, model.LabelName(name)) + if err != nil { + return nil, err + } + + result := make([]string, len(values), len(values)) + for i := 0; i < len(values); i++ { + result[i] = string(values[i]) + } + return result, nil +} + +func (q *distributorQuerier) Close() error { + return nil +} diff --git a/pkg/querier/distributor_queryable_test.go b/pkg/querier/distributor_queryable_test.go new file mode 100644 index 00000000000..1bc3ff7bac9 --- /dev/null +++ b/pkg/querier/distributor_queryable_test.go @@ -0,0 +1,64 @@ +package querier + +import ( + "context" + "testing" + + "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/pkg/labels" + "github.com/stretchr/testify/require" + "github.com/weaveworks/cortex/pkg/prom1/storage/metric" +) + +const ( + maxt, mint = 0, 10 +) + +func TestDistributorQuerier(t *testing.T) { + d := &mockDistributor{ + m: model.Matrix{ + // Matrixes are unsorted, so this tests that the labels get sorted. + &model.SampleStream{ + Metric: model.Metric{ + "foo": "bar", + }, + }, + &model.SampleStream{ + Metric: model.Metric{ + "bar": "baz", + }, + }, + }, + } + queryable := newDistributorQueryable(d) + querier, err := queryable.Querier(context.Background(), mint, maxt) + require.NoError(t, err) + + seriesSet, err := querier.Select(nil) + require.NoError(t, err) + + require.True(t, seriesSet.Next()) + series := seriesSet.At() + require.Equal(t, labels.Labels{{Name: "bar", Value: "baz"}}, series.Labels()) + + require.True(t, seriesSet.Next()) + series = seriesSet.At() + require.Equal(t, labels.Labels{{Name: "foo", Value: "bar"}}, series.Labels()) + + require.False(t, seriesSet.Next()) + require.NoError(t, seriesSet.Err()) +} + +type mockDistributor struct { + m model.Matrix +} + +func (m *mockDistributor) Query(ctx context.Context, from, to model.Time, matchers ...*labels.Matcher) (model.Matrix, error) { + return m.m, nil +} +func (m *mockDistributor) LabelValuesForLabelName(context.Context, model.LabelName) (model.LabelValues, error) { + return nil, nil +} +func (m *mockDistributor) MetricsForLabelMatchers(ctx context.Context, from, through model.Time, matchers ...*labels.Matcher) ([]metric.Metric, error) { + return nil, nil +} diff --git a/pkg/querier/querier.go b/pkg/querier/querier.go deleted file mode 100644 index 797726a132d..00000000000 --- a/pkg/querier/querier.go +++ /dev/null @@ -1,281 +0,0 @@ -package querier - -import ( - "context" - "flag" - "net/http" - "time" - - "github.com/go-kit/kit/log/level" - "github.com/prometheus/common/model" - "github.com/prometheus/prometheus/pkg/labels" - "github.com/prometheus/prometheus/promql" - "github.com/prometheus/prometheus/storage" - - "github.com/weaveworks/cortex/pkg/ingester/client" - "github.com/weaveworks/cortex/pkg/prom1/storage/metric" - - "github.com/weaveworks/cortex/pkg/util" -) - -// Config contains the configuration require to create a querier -type Config struct { - MaxConcurrent int - Timeout time.Duration -} - -// RegisterFlags adds the flags required to config this to the given FlagSet -func (cfg *Config) RegisterFlags(f *flag.FlagSet) { - flag.IntVar(&cfg.MaxConcurrent, "querier.max-concurrent", 20, "The maximum number of concurrent queries.") - flag.DurationVar(&cfg.Timeout, "querier.timeout", 2*time.Minute, "The timeout for a query.") - if flag.Lookup("promql.lookback-delta") == nil { - flag.DurationVar(&promql.LookbackDelta, "promql.lookback-delta", promql.LookbackDelta, "Time since the last sample after which a time series is considered stale and ignored by expression evaluations.") - } -} - -// ChunkStore is the interface we need to get chunks -type ChunkStore interface { - Get(ctx context.Context, from, through model.Time, matchers ...*labels.Matcher) (model.Matrix, error) -} - -// NewQueryable creates a new Queryable for cortex. -func NewQueryable(distributor Querier, chunkStore ChunkStore) MergeQueryable { - return MergeQueryable{ - queriers: []Querier{ - distributor, - &chunkQuerier{ - store: chunkStore, - }, - }, - } -} - -// A Querier allows querying an underlying storage for time series samples or metadata. -type Querier interface { - Query(ctx context.Context, from, to model.Time, matchers ...*labels.Matcher) (model.Matrix, error) - MetricsForLabelMatchers(ctx context.Context, from, through model.Time, matchers ...*labels.Matcher) ([]metric.Metric, error) - LabelValuesForLabelName(context.Context, model.LabelName) (model.LabelValues, error) -} - -// A chunkQuerier is a Querier that fetches samples from a ChunkStore. -type chunkQuerier struct { - store ChunkStore -} - -// Query implements Querier and transforms a list of chunks into sample -// matrices. -func (q *chunkQuerier) Query(ctx context.Context, from, to model.Time, matchers ...*labels.Matcher) (model.Matrix, error) { - // Get iterators for all matching series from ChunkStore. - matrix, err := q.store.Get(ctx, from, to, matchers...) - if err != nil { - return nil, promql.ErrStorage(err) - } - - return matrix, nil -} - -// LabelValuesForLabelName returns all of the label values that are associated with a given label name. -func (q *chunkQuerier) LabelValuesForLabelName(ctx context.Context, ln model.LabelName) (model.LabelValues, error) { - // TODO: Support querying historical label values at some point? - return nil, nil -} - -// MetricsForLabelMatchers is a noop for chunk querier. -func (q *chunkQuerier) MetricsForLabelMatchers(ctx context.Context, from, through model.Time, matcherSets ...*labels.Matcher) ([]metric.Metric, error) { - return nil, nil -} - -func mergeMatrices(matrices chan model.Matrix, errors chan error, n int) (model.Matrix, error) { - // Group samples from all matrices by fingerprint. - fpToSS := map[model.Fingerprint]*model.SampleStream{} - var lastErr error - for i := 0; i < n; i++ { - select { - case err := <-errors: - lastErr = err - - case matrix := <-matrices: - for _, ss := range matrix { - fp := ss.Metric.Fingerprint() - if fpSS, ok := fpToSS[fp]; !ok { - fpToSS[fp] = ss - } else { - fpSS.Values = util.MergeSampleSets(fpSS.Values, ss.Values) - } - } - } - } - if lastErr != nil { - return nil, lastErr - } - - matrix := make(model.Matrix, 0, len(fpToSS)) - for _, ss := range fpToSS { - matrix = append(matrix, ss) - } - return matrix, nil -} - -// A MergeQueryable is a storage.Queryable that produces a storage.Querier which merges -// results from multiple underlying Queriers. -type MergeQueryable struct { - queriers []Querier -} - -// Querier implements storage.Queryable. -func (q MergeQueryable) Querier(ctx context.Context, mint, maxt int64) (storage.Querier, error) { - return mergeQuerier{ - ctx: ctx, - queriers: q.queriers, - mint: mint, - maxt: maxt, - }, nil -} - -// RemoteReadHandler handles Prometheus remote read requests. -func (q MergeQueryable) RemoteReadHandler(w http.ResponseWriter, r *http.Request) { - compressionType := util.CompressionTypeFor(r.Header.Get("X-Prometheus-Remote-Read-Version")) - - ctx := r.Context() - var req client.ReadRequest - logger := util.WithContext(r.Context(), util.Logger) - if _, err := util.ParseProtoReader(ctx, r.Body, &req, compressionType); err != nil { - level.Error(logger).Log("err", err.Error()) - http.Error(w, err.Error(), http.StatusBadRequest) - return - } - - // Fetch samples for all queries in parallel. - resp := client.ReadResponse{ - Results: make([]*client.QueryResponse, len(req.Queries)), - } - errors := make(chan error) - for i, qr := range req.Queries { - go func(i int, qr *client.QueryRequest) { - from, to, matchers, err := client.FromQueryRequest(qr) - if err != nil { - errors <- err - return - } - - querier, err := q.Querier(ctx, int64(from), int64(to)) - if err != nil { - errors <- err - return - } - - matrix, err := querier.(mergeQuerier).selectSamplesMatrix(matchers...) - if err != nil { - errors <- err - return - } - - resp.Results[i] = client.ToQueryResponse(matrix) - errors <- nil - }(i, qr) - } - - var lastErr error - for range req.Queries { - err := <-errors - if err != nil { - lastErr = err - } - } - if lastErr != nil { - http.Error(w, lastErr.Error(), http.StatusBadRequest) - return - } - - if err := util.SerializeProtoResponse(w, &resp, compressionType); err != nil { - level.Error(logger).Log("msg", "error sending remote read response", "err", err) - } -} - -type mergeQuerier struct { - ctx context.Context - queriers []Querier - mint int64 - maxt int64 -} - -func (mq mergeQuerier) Select(sp *storage.SelectParams, matchers ...*labels.Matcher) (storage.SeriesSet, error) { - // TODO: Update underlying selectors to return errors directly. - // Kludge: Prometheus passes nil SelectParams if it is doing a 'series' operation, which needs only metadata - if sp == nil { - return mq.selectMetadata(matchers...), nil - } - return mq.selectSamples(matchers...), nil -} - -func (mq mergeQuerier) selectMetadata(matchers ...*labels.Matcher) storage.SeriesSet { - // NB that we don't do this in parallel, as in practice we only have two queriers, - // one of which is the chunk store, which doesn't implement this yet. - seriesSets := make([]storage.SeriesSet, 0, len(mq.queriers)) - for _, q := range mq.queriers { - ms, err := q.MetricsForLabelMatchers(mq.ctx, model.Time(mq.mint), model.Time(mq.maxt), matchers...) - if err != nil { - return errSeriesSet{err: err} - } - ss := metricsToSeriesSet(ms) - seriesSets = append(seriesSets, ss) - } - - return storage.NewMergeSeriesSet(seriesSets) -} - -func (mq mergeQuerier) selectSamples(matchers ...*labels.Matcher) storage.SeriesSet { - matrix, err := mq.selectSamplesMatrix(matchers...) - if err != nil { - return errSeriesSet{ - err: err, - } - } - return matrixToSeriesSet(matrix) -} - -func (mq mergeQuerier) selectSamplesMatrix(matchers ...*labels.Matcher) (model.Matrix, error) { - incomingMatrices := make(chan model.Matrix) - incomingErrors := make(chan error) - - for _, q := range mq.queriers { - go func(q Querier) { - matrix, err := q.Query(mq.ctx, model.Time(mq.mint), model.Time(mq.maxt), matchers...) - if err != nil { - incomingErrors <- err - } else { - incomingMatrices <- matrix - } - }(q) - } - - mergedMatrix, err := mergeMatrices(incomingMatrices, incomingErrors, len(mq.queriers)) - if err != nil { - level.Error(util.WithContext(mq.ctx, util.Logger)).Log("msg", "error in mergeQuerier.selectSamples", "err", err) - return nil, err - } - return mergedMatrix, nil -} - -func (mq mergeQuerier) LabelValues(name string) ([]string, error) { - valueSet := map[string]struct{}{} - for _, q := range mq.queriers { - vals, err := q.LabelValuesForLabelName(mq.ctx, model.LabelName(name)) - if err != nil { - return nil, err - } - for _, v := range vals { - valueSet[string(v)] = struct{}{} - } - } - - values := make([]string, 0, len(valueSet)) - for v := range valueSet { - values = append(values, v) - } - return values, nil -} - -func (mq mergeQuerier) Close() error { - return nil -} diff --git a/pkg/querier/remote_read.go b/pkg/querier/remote_read.go new file mode 100644 index 00000000000..c1dd260fb74 --- /dev/null +++ b/pkg/querier/remote_read.go @@ -0,0 +1,105 @@ +package querier + +import ( + "net/http" + + "github.com/go-kit/kit/log/level" + "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/storage" + "github.com/weaveworks/cortex/pkg/ingester/client" + "github.com/weaveworks/cortex/pkg/util" +) + +// RemoteReadHandler handles Prometheus remote read requests. +func RemoteReadHandler(q storage.Queryable) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + compressionType := util.CompressionTypeFor(r.Header.Get("X-Prometheus-Remote-Read-Version")) + + ctx := r.Context() + var req client.ReadRequest + logger := util.WithContext(r.Context(), util.Logger) + if _, err := util.ParseProtoReader(ctx, r.Body, &req, compressionType); err != nil { + level.Error(logger).Log("err", err.Error()) + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + + // Fetch samples for all queries in parallel. + resp := client.ReadResponse{ + Results: make([]*client.QueryResponse, len(req.Queries)), + } + errors := make(chan error) + for i, qr := range req.Queries { + go func(i int, qr *client.QueryRequest) { + from, to, matchers, err := client.FromQueryRequest(qr) + if err != nil { + errors <- err + return + } + + querier, err := q.Querier(ctx, int64(from), int64(to)) + if err != nil { + errors <- err + return + } + + seriesSet, err := querier.Select(nil, matchers...) + if err != nil { + errors <- err + return + } + + matrix, err := seriesSetToMatrix(seriesSet) + if err != nil { + errors <- err + return + } + + resp.Results[i] = client.ToQueryResponse(matrix) + errors <- nil + }(i, qr) + } + + var lastErr error + for range req.Queries { + err := <-errors + if err != nil { + lastErr = err + } + } + if lastErr != nil { + http.Error(w, lastErr.Error(), http.StatusBadRequest) + return + } + + if err := util.SerializeProtoResponse(w, &resp, compressionType); err != nil { + level.Error(logger).Log("msg", "error sending remote read response", "err", err) + } + }) +} + +func seriesSetToMatrix(s storage.SeriesSet) (model.Matrix, error) { + result := model.Matrix{} + + for s.Next() { + series := s.At() + values := []model.SamplePair{} + it := series.Iterator() + for it.Next() { + t, v := it.At() + values = append(values, model.SamplePair{ + Timestamp: model.Time(t), + Value: model.SampleValue(v), + }) + } + if err := it.Err(); err != nil { + return nil, err + } + result = append(result, &model.SampleStream{ + Metric: labelsToMetric(series.Labels()), + Values: values, + }) + } + + return result, s.Err() +} diff --git a/pkg/querier/querier_test.go b/pkg/querier/remote_read_test.go similarity index 50% rename from pkg/querier/querier_test.go rename to pkg/querier/remote_read_test.go index 620fbb46b3e..1d03e71ce02 100644 --- a/pkg/querier/querier_test.go +++ b/pkg/querier/remote_read_test.go @@ -15,28 +15,26 @@ import ( "github.com/prometheus/prometheus/storage" "github.com/stretchr/testify/require" "github.com/weaveworks/cortex/pkg/ingester/client" - "github.com/weaveworks/cortex/pkg/prom1/storage/metric" "github.com/weaveworks/cortex/pkg/util/wire" ) func TestRemoteReadHandler(t *testing.T) { - q := MergeQueryable{ - queriers: []Querier{ - mockQuerier{ - matrix: model.Matrix{ - { - Metric: model.Metric{"foo": "bar"}, - Values: []model.SamplePair{ - {Timestamp: 0, Value: 0}, - {Timestamp: 1, Value: 1}, - {Timestamp: 2, Value: 2}, - {Timestamp: 3, Value: 3}, - }, + q := storage.QueryableFunc(func(ctx context.Context, mint, maxt int64) (storage.Querier, error) { + return mockQuerier{ + matrix: model.Matrix{ + { + Metric: model.Metric{"foo": "bar"}, + Values: []model.SamplePair{ + {Timestamp: 0, Value: 0}, + {Timestamp: 1, Value: 1}, + {Timestamp: 2, Value: 2}, + {Timestamp: 3, Value: 3}, }, }, }, - }, - } + }, nil + }) + handler := RemoteReadHandler(q) requestBody, err := proto.Marshal(&client.ReadRequest{ Queries: []*client.QueryRequest{ @@ -50,7 +48,7 @@ func TestRemoteReadHandler(t *testing.T) { request.Header.Set("X-Prometheus-Remote-Read-Version", "0.1.0") recorder := httptest.NewRecorder() - q.RemoteReadHandler(recorder, request) + handler.ServeHTTP(recorder, request) require.Equal(t, 200, recorder.Result().StatusCode) responseBody, err := ioutil.ReadAll(recorder.Result().Body) @@ -86,58 +84,18 @@ func TestRemoteReadHandler(t *testing.T) { require.Equal(t, expected, response) } -func TestMergeQuerierSortsMetricLabels(t *testing.T) { - mq := mergeQuerier{ - ctx: context.Background(), - queriers: []Querier{ - mockQuerier{ - matrix: model.Matrix{ - { - Metric: model.Metric{ - model.MetricNameLabel: "testmetric", - "e": "f", - "a": "b", - "g": "h", - "c": "d", - }, - Values: []model.SamplePair{{Timestamp: 0, Value: 0}}, - }, - }, - }, - }, - mint: 0, - maxt: 0, - } - m, err := labels.NewMatcher(labels.MatchEqual, model.MetricNameLabel, "testmetric") - require.NoError(t, err) - dummyParams := storage.SelectParams{} - ss, err := mq.Select(&dummyParams, m) - require.NoError(t, err) - require.NoError(t, ss.Err()) - ss.Next() - require.NoError(t, ss.Err()) - l := ss.At().Labels() - require.Equal(t, labels.Labels{ - {Name: string(model.MetricNameLabel), Value: "testmetric"}, - {Name: "a", Value: "b"}, - {Name: "c", Value: "d"}, - {Name: "e", Value: "f"}, - {Name: "g", Value: "h"}, - }, l) -} - type mockQuerier struct { matrix model.Matrix } -func (m mockQuerier) Query(ctx context.Context, from, to model.Time, matchers ...*labels.Matcher) (model.Matrix, error) { - return m.matrix, nil +func (m mockQuerier) Select(_ *storage.SelectParams, matchers ...*labels.Matcher) (storage.SeriesSet, error) { + return matrixToSeriesSet(m.matrix), nil } -func (mockQuerier) LabelValuesForLabelName(context.Context, model.LabelName) (model.LabelValues, error) { +func (m mockQuerier) LabelValues(name string) ([]string, error) { return nil, nil } -func (mockQuerier) MetricsForLabelMatchers(ctx context.Context, from, through model.Time, matcherSets ...*labels.Matcher) ([]metric.Metric, error) { - return nil, nil +func (mockQuerier) Close() error { + return nil } diff --git a/pkg/querier/series_set.go b/pkg/querier/series_set.go index f59b7dea837..9f386572cfc 100644 --- a/pkg/querier/series_set.go +++ b/pkg/querier/series_set.go @@ -48,13 +48,21 @@ type concreteSeriesSet struct { series []storage.Series } +func newConcreteSeriesSet(series []storage.Series) storage.SeriesSet { + sort.Sort(byLabels(series)) + return &concreteSeriesSet{ + cur: -1, + series: series, + } +} + func (c *concreteSeriesSet) Next() bool { c.cur++ - return c.cur-1 < len(c.series) + return c.cur < len(c.series) } func (c *concreteSeriesSet) At() storage.Series { - return c.series[c.cur-1] + return c.series[c.cur] } func (c *concreteSeriesSet) Err() error { @@ -109,19 +117,6 @@ func (c *concreteSeriesIterator) Err() error { return nil } -func metricsToSeriesSet(ms []metric.Metric) storage.SeriesSet { - series := make([]storage.Series, 0, len(ms)) - for _, m := range ms { - series = append(series, &concreteSeries{ - labels: metricToLabels(m.Metric), - samples: nil, - }) - } - return &concreteSeriesSet{ - series: series, - } -} - func matrixToSeriesSet(m model.Matrix) storage.SeriesSet { series := make([]storage.Series, 0, len(m)) for _, ss := range m { @@ -130,9 +125,18 @@ func matrixToSeriesSet(m model.Matrix) storage.SeriesSet { samples: ss.Values, }) } - return &concreteSeriesSet{ - series: series, + return newConcreteSeriesSet(series) +} + +func metricsToSeriesSet(ms []metric.Metric) storage.SeriesSet { + series := make([]storage.Series, 0, len(ms)) + for _, m := range ms { + series = append(series, &concreteSeries{ + labels: metricToLabels(m.Metric), + samples: nil, + }) } + return newConcreteSeriesSet(series) } func metricToLabels(m model.Metric) labels.Labels { @@ -148,3 +152,17 @@ func metricToLabels(m model.Metric) labels.Labels { sort.Sort(ls) return ls } + +func labelsToMetric(ls labels.Labels) model.Metric { + m := make(model.Metric, len(ls)) + for _, l := range ls { + m[model.LabelName(l.Name)] = model.LabelValue(l.Value) + } + return m +} + +type byLabels []storage.Series + +func (b byLabels) Len() int { return len(b) } +func (b byLabels) Swap(i, j int) { b[i], b[j] = b[j], b[i] } +func (b byLabels) Less(i, j int) bool { return labels.Compare(b[i].Labels(), b[j].Labels()) < 0 } diff --git a/pkg/querier/series_set_test.go b/pkg/querier/series_set_test.go index e006f3d257e..9f5e3528014 100644 --- a/pkg/querier/series_set_test.go +++ b/pkg/querier/series_set_test.go @@ -6,6 +6,7 @@ import ( "github.com/prometheus/common/model" "github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/storage" + "github.com/stretchr/testify/require" ) func TestConcreteSeriesSet(t *testing.T) { @@ -17,22 +18,37 @@ func TestConcreteSeriesSet(t *testing.T) { labels: labels.FromStrings("foo", "baz"), samples: []model.SamplePair{{Value: 3, Timestamp: 4}}, } - c := &concreteSeriesSet{ - series: []storage.Series{series1, series2}, - } - if !c.Next() { - t.Fatalf("Expected Next() to be true.") - } - if c.At() != series1 { - t.Fatalf("Unexpected series returned.") - } - if !c.Next() { - t.Fatalf("Expected Next() to be true.") - } - if c.At() != series2 { - t.Fatalf("Unexpected series returned.") - } - if c.Next() { - t.Fatalf("Expected Next() to be false.") - } + c := newConcreteSeriesSet([]storage.Series{series2, series1}) + require.True(t, c.Next()) + require.Equal(t, series1, c.At()) + require.True(t, c.Next()) + require.Equal(t, series2, c.At()) + require.False(t, c.Next()) +} + +func TestMatrixToSeriesSetSortsMetricLabels(t *testing.T) { + matrix := model.Matrix{ + { + Metric: model.Metric{ + model.MetricNameLabel: "testmetric", + "e": "f", + "a": "b", + "g": "h", + "c": "d", + }, + Values: []model.SamplePair{{Timestamp: 0, Value: 0}}, + }, + } + ss := matrixToSeriesSet(matrix) + require.True(t, ss.Next()) + require.NoError(t, ss.Err()) + + l := ss.At().Labels() + require.Equal(t, labels.Labels{ + {Name: string(model.MetricNameLabel), Value: "testmetric"}, + {Name: "a", Value: "b"}, + {Name: "c", Value: "d"}, + {Name: "e", Value: "f"}, + {Name: "g", Value: "h"}, + }, l) } diff --git a/pkg/ruler/ruler_test.go b/pkg/ruler/ruler_test.go index ec98c6fd1f6..0d539020d72 100644 --- a/pkg/ruler/ruler_test.go +++ b/pkg/ruler/ruler_test.go @@ -29,7 +29,7 @@ func newTestRuler(t *testing.T, alertmanagerURL string) *Ruler { // other kinds of tests. engine := promql.NewEngine(nil, nil, 20, 2*time.Minute) - queryable := querier.NewQueryable(nil, nil) + queryable := querier.NewQueryable(nil, nil, false) ruler, err := NewRuler(cfg, engine, queryable, nil) if err != nil { t.Fatal(err) diff --git a/vendor/github.com/prometheus/prometheus/config/config.go b/vendor/github.com/prometheus/prometheus/config/config.go index 746aae3bf7b..b7a566764d7 100644 --- a/vendor/github.com/prometheus/prometheus/config/config.go +++ b/vendor/github.com/prometheus/prometheus/config/config.go @@ -370,6 +370,13 @@ func (c *ScrapeConfig) UnmarshalYAML(unmarshal func(interface{}) error) error { } } } + + // Add index to the static config target groups for unique identification + // within scrape pool. + for i, tg := range c.ServiceDiscoveryConfig.StaticConfigs { + tg.Source = fmt.Sprintf("%d", i) + } + return nil } @@ -432,6 +439,13 @@ func (c *AlertmanagerConfig) UnmarshalYAML(unmarshal func(interface{}) error) er } } } + + // Add index to the static config target groups for unique identification + // within scrape pool. + for i, tg := range c.ServiceDiscoveryConfig.StaticConfigs { + tg.Source = fmt.Sprintf("%d", i) + } + return nil } diff --git a/vendor/github.com/prometheus/prometheus/discovery/consul/consul.go b/vendor/github.com/prometheus/prometheus/discovery/consul/consul.go index 4a7c6cf906e..a0bcf134611 100644 --- a/vendor/github.com/prometheus/prometheus/discovery/consul/consul.go +++ b/vendor/github.com/prometheus/prometheus/discovery/consul/consul.go @@ -108,7 +108,7 @@ type SDConfig struct { // See https://www.consul.io/api/catalog.html#list-services // The list of services for which targets are discovered. // Defaults to all services if empty. - Services []string `yaml:"services"` + Services []string `yaml:"services,omitempty"` // An optional tag used to filter instances inside a service. A single tag is supported // here to match the Consul API. ServiceTag string `yaml:"tag,omitempty"` diff --git a/vendor/github.com/prometheus/prometheus/discovery/file/file.go b/vendor/github.com/prometheus/prometheus/discovery/file/file.go index 780e2581985..be6337822d0 100644 --- a/vendor/github.com/prometheus/prometheus/discovery/file/file.go +++ b/vendor/github.com/prometheus/prometheus/discovery/file/file.go @@ -279,7 +279,7 @@ func (d *Discovery) deleteTimestamp(filename string) { // stop shuts down the file watcher. func (d *Discovery) stop() { - level.Debug(d.logger).Log("msg", "Stopping file discovery...", "paths", d.paths) + level.Debug(d.logger).Log("msg", "Stopping file discovery...", "paths", fmt.Sprintf("%v", d.paths)) done := make(chan struct{}) defer close(done) @@ -299,10 +299,10 @@ func (d *Discovery) stop() { } }() if err := d.watcher.Close(); err != nil { - level.Error(d.logger).Log("msg", "Error closing file watcher", "paths", d.paths, "err", err) + level.Error(d.logger).Log("msg", "Error closing file watcher", "paths", fmt.Sprintf("%v", d.paths), "err", err) } - level.Debug(d.logger).Log("File discovery stopped", "paths", d.paths) + level.Debug(d.logger).Log("msg", "File discovery stopped") } // refresh reads all files matching the discovery's patterns and sends the respective diff --git a/vendor/github.com/prometheus/prometheus/discovery/kubernetes/ingress.go b/vendor/github.com/prometheus/prometheus/discovery/kubernetes/ingress.go index 592550212f1..0ff3b0e0a8e 100644 --- a/vendor/github.com/prometheus/prometheus/discovery/kubernetes/ingress.go +++ b/vendor/github.com/prometheus/prometheus/discovery/kubernetes/ingress.go @@ -176,13 +176,22 @@ func (s *Ingress) buildIngress(ingress *v1beta1.Ingress) *targetgroup.Group { } tg.Labels = ingressLabels(ingress) - schema := "http" - if ingress.Spec.TLS != nil { - schema = "https" + tlsHosts := make(map[string]struct{}) + for _, tls := range ingress.Spec.TLS { + for _, host := range tls.Hosts { + tlsHosts[host] = struct{}{} + } } + for _, rule := range ingress.Spec.Rules { paths := pathsFromIngressRule(&rule.IngressRuleValue) + schema := "http" + _, isTLS := tlsHosts[rule.Host] + if isTLS { + schema = "https" + } + for _, path := range paths { tg.Targets = append(tg.Targets, model.LabelSet{ model.AddressLabel: lv(rule.Host), diff --git a/vendor/github.com/prometheus/prometheus/discovery/kubernetes/kubernetes.go b/vendor/github.com/prometheus/prometheus/discovery/kubernetes/kubernetes.go index a55afde0785..faf4087ee42 100644 --- a/vendor/github.com/prometheus/prometheus/discovery/kubernetes/kubernetes.go +++ b/vendor/github.com/prometheus/prometheus/discovery/kubernetes/kubernetes.go @@ -84,13 +84,13 @@ func (c *Role) UnmarshalYAML(unmarshal func(interface{}) error) error { // SDConfig is the configuration for Kubernetes service discovery. type SDConfig struct { - APIServer config_util.URL `yaml:"api_server"` + APIServer config_util.URL `yaml:"api_server,omitempty"` Role Role `yaml:"role"` BasicAuth *config_util.BasicAuth `yaml:"basic_auth,omitempty"` BearerToken config_util.Secret `yaml:"bearer_token,omitempty"` BearerTokenFile string `yaml:"bearer_token_file,omitempty"` TLSConfig config_util.TLSConfig `yaml:"tls_config,omitempty"` - NamespaceDiscovery NamespaceDiscovery `yaml:"namespaces"` + NamespaceDiscovery NamespaceDiscovery `yaml:"namespaces,omitempty"` } // UnmarshalYAML implements the yaml.Unmarshaler interface. @@ -250,28 +250,31 @@ func (d *Discovery) Run(ctx context.Context, ch chan<- []*targetgroup.Group) { switch d.role { case RoleEndpoint: for _, namespace := range namespaces { + e := d.client.CoreV1().Endpoints(namespace) elw := &cache.ListWatch{ ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { - return d.client.CoreV1().Endpoints(namespace).List(options) + return e.List(options) }, WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { - return d.client.CoreV1().Endpoints(namespace).Watch(options) + return e.Watch(options) }, } + s := d.client.CoreV1().Services(namespace) slw := &cache.ListWatch{ ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { - return d.client.CoreV1().Services(namespace).List(options) + return s.List(options) }, WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { - return d.client.CoreV1().Services(namespace).Watch(options) + return s.Watch(options) }, } + p := d.client.CoreV1().Pods(namespace) plw := &cache.ListWatch{ ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { - return d.client.CoreV1().Pods(namespace).List(options) + return p.List(options) }, WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { - return d.client.CoreV1().Pods(namespace).Watch(options) + return p.Watch(options) }, } eps := NewEndpoints( @@ -287,12 +290,13 @@ func (d *Discovery) Run(ctx context.Context, ch chan<- []*targetgroup.Group) { } case RolePod: for _, namespace := range namespaces { + p := d.client.CoreV1().Pods(namespace) plw := &cache.ListWatch{ ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { - return d.client.CoreV1().Pods(namespace).List(options) + return p.List(options) }, WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { - return d.client.CoreV1().Pods(namespace).Watch(options) + return p.Watch(options) }, } pod := NewPod( @@ -304,12 +308,13 @@ func (d *Discovery) Run(ctx context.Context, ch chan<- []*targetgroup.Group) { } case RoleService: for _, namespace := range namespaces { + s := d.client.CoreV1().Services(namespace) slw := &cache.ListWatch{ ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { - return d.client.CoreV1().Services(namespace).List(options) + return s.List(options) }, WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { - return d.client.CoreV1().Services(namespace).Watch(options) + return s.Watch(options) }, } svc := NewService( @@ -321,12 +326,13 @@ func (d *Discovery) Run(ctx context.Context, ch chan<- []*targetgroup.Group) { } case RoleIngress: for _, namespace := range namespaces { + i := d.client.ExtensionsV1beta1().Ingresses(namespace) ilw := &cache.ListWatch{ ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { - return d.client.ExtensionsV1beta1().Ingresses(namespace).List(options) + return i.List(options) }, WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { - return d.client.ExtensionsV1beta1().Ingresses(namespace).Watch(options) + return i.Watch(options) }, } ingress := NewIngress( diff --git a/vendor/github.com/prometheus/prometheus/discovery/manager.go b/vendor/github.com/prometheus/prometheus/discovery/manager.go index 669a91dc559..97468a54907 100644 --- a/vendor/github.com/prometheus/prometheus/discovery/manager.go +++ b/vendor/github.com/prometheus/prometheus/discovery/manager.go @@ -285,7 +285,7 @@ func (m *Manager) providersFromConfig(cfg sd_config.ServiceDiscoveryConfig) map[ app("triton", i, t) } if len(cfg.StaticConfigs) > 0 { - app("static", 0, NewStaticProvider(cfg.StaticConfigs)) + app("static", 0, &StaticProvider{cfg.StaticConfigs}) } return providers @@ -296,15 +296,6 @@ type StaticProvider struct { TargetGroups []*targetgroup.Group } -// NewStaticProvider returns a StaticProvider configured with the given -// target groups. -func NewStaticProvider(groups []*targetgroup.Group) *StaticProvider { - for i, tg := range groups { - tg.Source = fmt.Sprintf("%d", i) - } - return &StaticProvider{groups} -} - // Run implements the Worker interface. func (sd *StaticProvider) Run(ctx context.Context, ch chan<- []*targetgroup.Group) { // We still have to consider that the consumer exits right away in which case diff --git a/vendor/github.com/prometheus/prometheus/discovery/zookeeper/zookeeper.go b/vendor/github.com/prometheus/prometheus/discovery/zookeeper/zookeeper.go index 60a26e8605c..4871214576a 100644 --- a/vendor/github.com/prometheus/prometheus/discovery/zookeeper/zookeeper.go +++ b/vendor/github.com/prometheus/prometheus/discovery/zookeeper/zookeeper.go @@ -137,8 +137,11 @@ func NewDiscovery( logger = log.NewNopLogger() } - conn, _, err := zk.Connect(srvs, timeout) - conn.SetLogger(treecache.NewZookeeperLogger(logger)) + conn, _, err := zk.Connect( + srvs, timeout, + func(c *zk.Conn) { + c.SetLogger(treecache.NewZookeeperLogger(logger)) + }) if err != nil { return nil } diff --git a/vendor/github.com/prometheus/prometheus/prompb/README.md b/vendor/github.com/prometheus/prometheus/prompb/README.md new file mode 100644 index 00000000000..d2aa933ef9d --- /dev/null +++ b/vendor/github.com/prometheus/prometheus/prompb/README.md @@ -0,0 +1,14 @@ +The compiled protobufs are version controlled and you won't normally need to +re-compile them when building Prometheus. + +If however you have modified the defs and do need to re-compile, run +`./scripts/genproto.sh` from the parent dir. + +In order for the script to run, you'll need `protoc` (version 3.5) in your +PATH, and the following Go packages installed: + +- github.com/gogo/protobuf +- github.com/gogo/protobuf/protoc-gen-gogofast +- github.com/grpc-ecosystem/grpc-gateway/protoc-gen-grpc-gateway/ +- github.com/grpc-ecosystem/grpc-gateway/protoc-gen-swagger +- golang.org/x/tools/cmd/goimports diff --git a/vendor/github.com/prometheus/prometheus/promql/engine.go b/vendor/github.com/prometheus/prometheus/promql/engine.go index adf3bf00cd8..20f2faf2292 100644 --- a/vendor/github.com/prometheus/prometheus/promql/engine.go +++ b/vendor/github.com/prometheus/prometheus/promql/engine.go @@ -493,7 +493,7 @@ func (ng *Engine) populateSeries(ctx context.Context, q storage.Queryable, s *Ev level.Error(ng.logger).Log("msg", "error selecting series set", "err", err) return err } - n.series, err = expandSeriesSet(set) + n.series, err = expandSeriesSet(ctx, set) if err != nil { // TODO(fabxc): use multi-error. level.Error(ng.logger).Log("msg", "error expanding series set", "err", err) @@ -508,7 +508,7 @@ func (ng *Engine) populateSeries(ctx context.Context, q storage.Queryable, s *Ev level.Error(ng.logger).Log("msg", "error selecting series set", "err", err) return err } - n.series, err = expandSeriesSet(set) + n.series, err = expandSeriesSet(ctx, set) if err != nil { level.Error(ng.logger).Log("msg", "error expanding series set", "err", err) return err @@ -538,8 +538,13 @@ func extractFuncFromPath(p []Node) string { return extractFuncFromPath(p[:len(p)-1]) } -func expandSeriesSet(it storage.SeriesSet) (res []storage.Series, err error) { +func expandSeriesSet(ctx context.Context, it storage.SeriesSet) (res []storage.Series, err error) { for it.Next() { + select { + case <-ctx.Done(): + return nil, ctx.Err() + default: + } res = append(res, it.At()) } return res, it.Err() @@ -1039,6 +1044,9 @@ func (ev *evaluator) matrixSelector(node *MatrixSelector) Matrix { var it *storage.BufferedSeriesIterator for i, s := range node.series { + if err := contextDone(ev.ctx, "expression evaluation"); err != nil { + ev.error(err) + } if it == nil { it = storage.NewBuffer(s.Iterator(), durationMilliseconds(node.Range)) } else { diff --git a/vendor/github.com/prometheus/prometheus/promql/test.go b/vendor/github.com/prometheus/prometheus/promql/test.go index 0b512881c6a..14a5f399f49 100644 --- a/vendor/github.com/prometheus/prometheus/promql/test.go +++ b/vendor/github.com/prometheus/prometheus/promql/test.go @@ -160,7 +160,7 @@ func (t *Test) parseEval(lines []string, i int) (int, *evalCmd, error) { } ts := testStartTime.Add(time.Duration(offset)) - cmd := newEvalCmd(expr, ts) + cmd := newEvalCmd(expr, ts, i+1) switch mod { case "ordered": cmd.ordered = true @@ -303,6 +303,7 @@ func (cmd *loadCmd) append(a storage.Appender) error { type evalCmd struct { expr string start time.Time + line int fail, ordered bool @@ -319,10 +320,11 @@ func (e entry) String() string { return fmt.Sprintf("%d: %s", e.pos, e.vals) } -func newEvalCmd(expr string, start time.Time) *evalCmd { +func newEvalCmd(expr string, start time.Time, line int) *evalCmd { return &evalCmd{ expr: expr, start: start, + line: line, metrics: map[uint64]labels.Labels{}, expected: map[uint64]entry{}, @@ -437,11 +439,11 @@ func (t *Test) exec(tc testCommand) error { if cmd.fail { return nil } - return fmt.Errorf("error evaluating query %q: %s", cmd.expr, res.Err) + return fmt.Errorf("error evaluating query %q (line %d): %s", cmd.expr, cmd.line, res.Err) } defer q.Close() if res.Err == nil && cmd.fail { - return fmt.Errorf("expected error evaluating query but got none") + return fmt.Errorf("expected error evaluating query %q (line %d) but got none", cmd.expr, cmd.line) } err := cmd.compareResult(res.Value) @@ -454,7 +456,7 @@ func (t *Test) exec(tc testCommand) error { q, _ = t.queryEngine.NewRangeQuery(t.storage, cmd.expr, cmd.start.Add(-time.Minute), cmd.start.Add(time.Minute), time.Minute) rangeRes := q.Exec(t.context) if rangeRes.Err != nil { - return fmt.Errorf("error evaluating query %q in range mode: %s", cmd.expr, rangeRes.Err) + return fmt.Errorf("error evaluating query %q (line %d) in range mode: %s", cmd.expr, cmd.line, rangeRes.Err) } defer q.Close() if cmd.ordered { @@ -477,7 +479,7 @@ func (t *Test) exec(tc testCommand) error { err = cmd.compareResult(vec) } if err != nil { - return fmt.Errorf("error in %s %s rande mode: %s", cmd, cmd.expr, err) + return fmt.Errorf("error in %s %s (line %d) rande mode: %s", cmd, cmd.expr, cmd.line, err) } default: diff --git a/vendor/github.com/prometheus/prometheus/storage/fanout.go b/vendor/github.com/prometheus/prometheus/storage/fanout.go index 32828715972..3ab994391c5 100644 --- a/vendor/github.com/prometheus/prometheus/storage/fanout.go +++ b/vendor/github.com/prometheus/prometheus/storage/fanout.go @@ -450,10 +450,10 @@ func (c *mergeIterator) Next() bool { return false } - currt, currv := c.At() + currt, _ := c.At() for len(c.h) > 0 { - nextt, nextv := c.h[0].At() - if nextt != currt || nextv != currv { + nextt, _ := c.h[0].At() + if nextt != currt { break } diff --git a/vendor/github.com/prometheus/prometheus/storage/remote/codec.go b/vendor/github.com/prometheus/prometheus/storage/remote/codec.go index d3858de74f3..66037a9b5ff 100644 --- a/vendor/github.com/prometheus/prometheus/storage/remote/codec.go +++ b/vendor/github.com/prometheus/prometheus/storage/remote/codec.go @@ -96,9 +96,12 @@ func ToQuery(from, to int64, matchers []*labels.Matcher, p *storage.SelectParams return nil, err } - rp := &prompb.ReadHints{ - StepMs: p.Step, - Func: p.Func, + var rp *prompb.ReadHints + if p != nil { + rp = &prompb.ReadHints{ + StepMs: p.Step, + Func: p.Func, + } } return &prompb.Query{