diff --git a/querier/querier.go b/querier/querier.go index c47c68f5631..5a337032ca3 100644 --- a/querier/querier.go +++ b/querier/querier.go @@ -76,28 +76,46 @@ type MergeQuerier struct { // QueryRange fetches series for a given time range and label matchers from multiple // promql.Queriers and returns the merged results as a map of series iterators. func (qm MergeQuerier) QueryRange(ctx context.Context, from, to model.Time, matchers ...*metric.LabelMatcher) ([]local.SeriesIterator, error) { - fpToIt := map[model.Fingerprint]local.SeriesIterator{} - // Fetch samples from all queriers and group them by fingerprint (unsorted - // and with overlap). + // Fetch samples from all queriers in parallel + matrices := make(chan model.Matrix) + errors := make(chan error) for _, q := range qm.Queriers { - matrix, err := q.Query(ctx, from, to, matchers...) - if err != nil { - return nil, err - } + go func(q Querier) { + matrix, err := q.Query(ctx, from, to, matchers...) + if err != nil { + errors <- err + } else { + matrices <- matrix + } + }(q) + } - for _, ss := range matrix { - fp := ss.Metric.Fingerprint() - if it, ok := fpToIt[fp]; !ok { - fpToIt[fp] = sampleStreamIterator{ - ss: ss, + // Group them by fingerprint (unsorted and with overlap). + fpToIt := map[model.Fingerprint]local.SeriesIterator{} + var lastErr error + for i := 0; i < len(qm.Queriers); i++ { + select { + case err := <-errors: + lastErr = err + + case matrix := <-matrices: + for _, ss := range matrix { + fp := ss.Metric.Fingerprint() + if it, ok := fpToIt[fp]; !ok { + fpToIt[fp] = sampleStreamIterator{ + ss: ss, + } + } else { + ssIt := it.(sampleStreamIterator) + ssIt.ss.Values = util.MergeSamples(ssIt.ss.Values, ss.Values) } - } else { - ssIt := it.(sampleStreamIterator) - ssIt.ss.Values = util.MergeSamples(ssIt.ss.Values, ss.Values) } } } + if lastErr != nil { + return nil, lastErr + } iterators := make([]local.SeriesIterator, 0, len(fpToIt)) for _, it := range fpToIt {