diff --git a/pkg/ingester/ingester.go b/pkg/ingester/ingester.go index 7a378d55341..60df338bc74 100644 --- a/pkg/ingester/ingester.go +++ b/pkg/ingester/ingester.go @@ -355,7 +355,7 @@ func (i *Ingester) Query(ctx old_ctx.Context, req *client.QueryRequest) (*client } result.Timeseries = append(result.Timeseries, ts) return nil - }) + }, nil, 0) queriedSeries.Observe(float64(numSeries)) queriedSamples.Observe(float64(numSamples)) return result, err @@ -407,22 +407,19 @@ func (i *Ingester) QueryStream(req *client.QueryRequest, stream client.Ingester_ Chunks: wireChunks, }) - if len(batch) >= queryStreamBatchSize { - err = stream.Send(&client.QueryStreamResponse{ - Timeseries: batch, - }) - batch = batch[:0] + return nil + }, func(ctx context.Context) error { + if len(batch) == 0 { + return nil } - return err - }) - if err != nil { - return err - } - - if len(batch) > 0 { err = stream.Send(&client.QueryStreamResponse{ Timeseries: batch, }) + batch = batch[:0] + return err + }, queryStreamBatchSize) + if err != nil { + return err } queriedSeries.Observe(float64(numSeries)) @@ -494,7 +491,7 @@ func (i *Ingester) MetricsForLabelMatchers(ctx old_ctx.Context, req *client.Metr metrics[fp] = series.labels() } return nil - }); err != nil { + }, nil, 0); err != nil { return nil, err } } diff --git a/pkg/ingester/user_state.go b/pkg/ingester/user_state.go index 9161bf1fcb8..1a399b6df25 100644 --- a/pkg/ingester/user_state.go +++ b/pkg/ingester/user_state.go @@ -257,9 +257,18 @@ func (u *userState) removeSeries(fp model.Fingerprint, metric labelPairs) { memSeries.Dec() } -// forSeriesMatching passes all series matching the given matchers to the provided callback. -// Deals with locking and the quirks of zero-length matcher values. -func (u *userState) forSeriesMatching(ctx context.Context, allMatchers []*labels.Matcher, callback func(context.Context, model.Fingerprint, *memorySeries) error) error { +// forSeriesMatching passes all series matching the given matchers to the +// provided callback. Deals with locking and the quirks of zero-length matcher +// values. There are 2 callbacks: +// - The `add` callback is called for each series while the lock is held, and +// is intend to be used by the caller to build a batch. +// - The `send` callback is called at certain intervals specified by batchSize +// with no locks held, and is intended to be used by the caller to send the +// built batches. +func (u *userState) forSeriesMatching(ctx context.Context, allMatchers []*labels.Matcher, + add func(context.Context, model.Fingerprint, *memorySeries) error, + send func(context.Context) error, batchSize int, +) error { log, ctx := spanlogger.New(ctx, "forSeriesMatching") defer log.Finish() @@ -273,7 +282,7 @@ func (u *userState) forSeriesMatching(ctx context.Context, allMatchers []*labels // fps is sorted, lock them in order to prevent deadlocks outer: - for _, fp := range fps { + for i, fp := range fps { if err := ctx.Err(); err != nil { return err } @@ -292,12 +301,21 @@ outer: } } - err := callback(ctx, fp, series) + err := add(ctx, fp, series) u.fpLocker.Unlock(fp) if err != nil { return err } + + if batchSize > 0 && i+1%batchSize == 0 && send != nil { + if err = send(ctx); err != nil { + return nil + } + } } + if send != nil { + return send(ctx) + } return nil }