From 414ad618cff65f0d232e150c72acc8f8d395154f Mon Sep 17 00:00:00 2001 From: Ganesh Vernekar Date: Wed, 20 Feb 2019 11:47:55 +0530 Subject: [PATCH 1/3] Break forSeriesMatching callback into 2 callbacks Signed-off-by: Ganesh Vernekar --- pkg/ingester/ingester.go | 18 +++++++++--------- pkg/ingester/user_state.go | 20 ++++++++++++++++++-- 2 files changed, 27 insertions(+), 11 deletions(-) diff --git a/pkg/ingester/ingester.go b/pkg/ingester/ingester.go index 7a378d55341..bd3f53779b4 100644 --- a/pkg/ingester/ingester.go +++ b/pkg/ingester/ingester.go @@ -355,7 +355,7 @@ func (i *Ingester) Query(ctx old_ctx.Context, req *client.QueryRequest) (*client } result.Timeseries = append(result.Timeseries, ts) return nil - }) + }, nil, 0) queriedSeries.Observe(float64(numSeries)) queriedSamples.Observe(float64(numSamples)) return result, err @@ -407,14 +407,14 @@ func (i *Ingester) QueryStream(req *client.QueryRequest, stream client.Ingester_ Chunks: wireChunks, }) - if len(batch) >= queryStreamBatchSize { - err = stream.Send(&client.QueryStreamResponse{ - Timeseries: batch, - }) - batch = batch[:0] - } + return nil + }, func(ctx context.Context) error { + err = stream.Send(&client.QueryStreamResponse{ + Timeseries: batch, + }) + batch = batch[:0] return err - }) + }, queryStreamBatchSize) if err != nil { return err } @@ -494,7 +494,7 @@ func (i *Ingester) MetricsForLabelMatchers(ctx old_ctx.Context, req *client.Metr metrics[fp] = series.labels() } return nil - }); err != nil { + }, nil, 0); err != nil { return nil, err } } diff --git a/pkg/ingester/user_state.go b/pkg/ingester/user_state.go index 9161bf1fcb8..cfc100c8ae2 100644 --- a/pkg/ingester/user_state.go +++ b/pkg/ingester/user_state.go @@ -259,7 +259,11 @@ func (u *userState) removeSeries(fp model.Fingerprint, metric labelPairs) { // forSeriesMatching passes all series matching the given matchers to the provided callback. // Deals with locking and the quirks of zero-length matcher values. -func (u *userState) forSeriesMatching(ctx context.Context, allMatchers []*labels.Matcher, callback func(context.Context, model.Fingerprint, *memorySeries) error) error { +// There are 2 callbacks. callback1 is called for each series, where the lock is held. +// callback2 is called at certain intervals specified by callback2Interval, +// i.e. callback2() after callback2Interval calls of callback1(). +func (u *userState) forSeriesMatching(ctx context.Context, allMatchers []*labels.Matcher, callback1 func(context.Context, model.Fingerprint, *memorySeries) error, + callback2 func(context.Context) error, callback2Interval int) error { log, ctx := spanlogger.New(ctx, "forSeriesMatching") defer log.Finish() @@ -271,6 +275,10 @@ func (u *userState) forSeriesMatching(ctx context.Context, allMatchers []*labels level.Debug(log).Log("series", len(fps)) + iter := 0 + if callback2Interval <= 0 { + callback2Interval = 1 + } // fps is sorted, lock them in order to prevent deadlocks outer: for _, fp := range fps { @@ -292,11 +300,19 @@ outer: } } - err := callback(ctx, fp, series) + err := callback1(ctx, fp, series) + iter++ u.fpLocker.Unlock(fp) if err != nil { return err } + + if iter >= callback2Interval && callback2 != nil { + if err = callback2(ctx); err != nil { + return nil + } + iter = 0 + } } return nil From 2c6a7f29b7cbafa2d5ab604bb183ebc0f43265c3 Mon Sep 17 00:00:00 2001 From: Tom Wilkie Date: Sat, 23 Feb 2019 11:35:15 -0500 Subject: [PATCH 2/3] Naming & feedback. Signed-off-by: Tom Wilkie --- pkg/ingester/ingester.go | 9 +++------ pkg/ingester/user_state.go | 21 +++++++++------------ 2 files changed, 12 insertions(+), 18 deletions(-) diff --git a/pkg/ingester/ingester.go b/pkg/ingester/ingester.go index bd3f53779b4..60df338bc74 100644 --- a/pkg/ingester/ingester.go +++ b/pkg/ingester/ingester.go @@ -409,6 +409,9 @@ func (i *Ingester) QueryStream(req *client.QueryRequest, stream client.Ingester_ return nil }, func(ctx context.Context) error { + if len(batch) == 0 { + return nil + } err = stream.Send(&client.QueryStreamResponse{ Timeseries: batch, }) @@ -419,12 +422,6 @@ func (i *Ingester) QueryStream(req *client.QueryRequest, stream client.Ingester_ return err } - if len(batch) > 0 { - err = stream.Send(&client.QueryStreamResponse{ - Timeseries: batch, - }) - } - queriedSeries.Observe(float64(numSeries)) queriedChunks.Observe(float64(numChunks)) level.Debug(log).Log("streams", numSeries) diff --git a/pkg/ingester/user_state.go b/pkg/ingester/user_state.go index cfc100c8ae2..074d05b3e3f 100644 --- a/pkg/ingester/user_state.go +++ b/pkg/ingester/user_state.go @@ -262,8 +262,8 @@ func (u *userState) removeSeries(fp model.Fingerprint, metric labelPairs) { // There are 2 callbacks. callback1 is called for each series, where the lock is held. // callback2 is called at certain intervals specified by callback2Interval, // i.e. callback2() after callback2Interval calls of callback1(). -func (u *userState) forSeriesMatching(ctx context.Context, allMatchers []*labels.Matcher, callback1 func(context.Context, model.Fingerprint, *memorySeries) error, - callback2 func(context.Context) error, callback2Interval int) error { +func (u *userState) forSeriesMatching(ctx context.Context, allMatchers []*labels.Matcher, append func(context.Context, model.Fingerprint, *memorySeries) error, + send func(context.Context) error, batchSize int) error { log, ctx := spanlogger.New(ctx, "forSeriesMatching") defer log.Finish() @@ -275,13 +275,9 @@ func (u *userState) forSeriesMatching(ctx context.Context, allMatchers []*labels level.Debug(log).Log("series", len(fps)) - iter := 0 - if callback2Interval <= 0 { - callback2Interval = 1 - } // fps is sorted, lock them in order to prevent deadlocks outer: - for _, fp := range fps { + for i, fp := range fps { if err := ctx.Err(); err != nil { return err } @@ -300,20 +296,21 @@ outer: } } - err := callback1(ctx, fp, series) - iter++ + err := append(ctx, fp, series) u.fpLocker.Unlock(fp) if err != nil { return err } - if iter >= callback2Interval && callback2 != nil { - if err = callback2(ctx); err != nil { + if batchSize > 0 && i+1%batchSize == 0 && send != nil { + if err = send(ctx); err != nil { return nil } - iter = 0 } } + if send != nil { + return send(ctx) + } return nil } From 8207e5985e4f2ffbbbb8929a61e688715097807d Mon Sep 17 00:00:00 2001 From: Tom Wilkie Date: Mon, 11 Mar 2019 14:28:51 +0000 Subject: [PATCH 3/3] Review feedback Signed-off-by: Tom Wilkie --- pkg/ingester/user_state.go | 21 +++++++++++++-------- 1 file changed, 13 insertions(+), 8 deletions(-) diff --git a/pkg/ingester/user_state.go b/pkg/ingester/user_state.go index 074d05b3e3f..1a399b6df25 100644 --- a/pkg/ingester/user_state.go +++ b/pkg/ingester/user_state.go @@ -257,13 +257,18 @@ func (u *userState) removeSeries(fp model.Fingerprint, metric labelPairs) { memSeries.Dec() } -// forSeriesMatching passes all series matching the given matchers to the provided callback. -// Deals with locking and the quirks of zero-length matcher values. -// There are 2 callbacks. callback1 is called for each series, where the lock is held. -// callback2 is called at certain intervals specified by callback2Interval, -// i.e. callback2() after callback2Interval calls of callback1(). -func (u *userState) forSeriesMatching(ctx context.Context, allMatchers []*labels.Matcher, append func(context.Context, model.Fingerprint, *memorySeries) error, - send func(context.Context) error, batchSize int) error { +// forSeriesMatching passes all series matching the given matchers to the +// provided callback. Deals with locking and the quirks of zero-length matcher +// values. There are 2 callbacks: +// - The `add` callback is called for each series while the lock is held, and +// is intend to be used by the caller to build a batch. +// - The `send` callback is called at certain intervals specified by batchSize +// with no locks held, and is intended to be used by the caller to send the +// built batches. +func (u *userState) forSeriesMatching(ctx context.Context, allMatchers []*labels.Matcher, + add func(context.Context, model.Fingerprint, *memorySeries) error, + send func(context.Context) error, batchSize int, +) error { log, ctx := spanlogger.New(ctx, "forSeriesMatching") defer log.Finish() @@ -296,7 +301,7 @@ outer: } } - err := append(ctx, fp, series) + err := add(ctx, fp, series) u.fpLocker.Unlock(fp) if err != nil { return err