Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 33 additions & 15 deletions querier/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,28 +76,46 @@ type MergeQuerier struct {
// QueryRange fetches series for a given time range and label matchers from multiple
// promql.Queriers and returns the merged results as a map of series iterators.
func (qm MergeQuerier) QueryRange(ctx context.Context, from, to model.Time, matchers ...*metric.LabelMatcher) ([]local.SeriesIterator, error) {
fpToIt := map[model.Fingerprint]local.SeriesIterator{}

// Fetch samples from all queriers and group them by fingerprint (unsorted
// and with overlap).
// Fetch samples from all queriers in parallel
matrices := make(chan model.Matrix)
errors := make(chan error)
for _, q := range qm.Queriers {
matrix, err := q.Query(ctx, from, to, matchers...)
if err != nil {
return nil, err
}
go func(q Querier) {
matrix, err := q.Query(ctx, from, to, matchers...)
if err != nil {
errors <- err
} else {
matrices <- matrix
}
}(q)
}

for _, ss := range matrix {
fp := ss.Metric.Fingerprint()
if it, ok := fpToIt[fp]; !ok {
fpToIt[fp] = sampleStreamIterator{
ss: ss,
// Group them by fingerprint (unsorted and with overlap).
fpToIt := map[model.Fingerprint]local.SeriesIterator{}
var lastErr error
for i := 0; i < len(qm.Queriers); i++ {
select {
case err := <-errors:
lastErr = err

case matrix := <-matrices:
for _, ss := range matrix {
fp := ss.Metric.Fingerprint()
if it, ok := fpToIt[fp]; !ok {
fpToIt[fp] = sampleStreamIterator{
ss: ss,
}
} else {
ssIt := it.(sampleStreamIterator)
ssIt.ss.Values = util.MergeSamples(ssIt.ss.Values, ss.Values)
}
} else {
ssIt := it.(sampleStreamIterator)
ssIt.ss.Values = util.MergeSamples(ssIt.ss.Values, ss.Values)
}
}
}
if lastErr != nil {
return nil, lastErr
}

iterators := make([]local.SeriesIterator, 0, len(fpToIt))
for _, it := range fpToIt {
Expand Down