From d7427594ba8fbe2dd476dbf47658b60d984f7f50 Mon Sep 17 00:00:00 2001 From: Tom Wilkie Date: Thu, 14 Mar 2019 13:05:37 +0000 Subject: [PATCH] Parallelise calls from the querier -> ingester in an attempt to reduce latency. Signed-off-by: Tom Wilkie --- pkg/distributor/distributor.go | 4 ++- pkg/distributor/query.go | 49 ++++++++++++++++++++++++-------- pkg/ingester/client/cortex.proto | 3 ++ pkg/ingester/ingester.go | 6 ++-- pkg/ingester/user_state.go | 5 ++++ 5 files changed, 51 insertions(+), 16 deletions(-) diff --git a/pkg/distributor/distributor.go b/pkg/distributor/distributor.go index c1a07e984c5..5b03ab8e8a3 100644 --- a/pkg/distributor/distributor.go +++ b/pkg/distributor/distributor.go @@ -104,7 +104,8 @@ type Config struct { ExtraQueryDelay time.Duration LimiterReloadPeriod time.Duration - ShardByAllLabels bool + ShardByAllLabels bool + QueryStreamShards int // for testing ingesterClientFactory client.Factory @@ -120,6 +121,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { f.DurationVar(&cfg.ExtraQueryDelay, "distributor.extra-query-delay", 0, "Time to wait before sending more than the minimum successful query requests.") f.DurationVar(&cfg.LimiterReloadPeriod, "distributor.limiter-reload-period", 5*time.Minute, "Period at which to reload user ingestion limits.") f.BoolVar(&cfg.ShardByAllLabels, "distributor.shard-by-all-labels", false, "Distribute samples based on all labels, as opposed to solely by user and metric name.") + f.IntVar(&cfg.QueryStreamShards, "distributor.query-stream-shards", 1, "Number of parallel calls to use for queries to the ingesters.") } // New constructs a new Distributor diff --git a/pkg/distributor/query.go b/pkg/distributor/query.go index 3402fc08ffb..a53a06303f7 100644 --- a/pkg/distributor/query.go +++ b/pkg/distributor/query.go @@ -4,6 +4,7 @@ import ( "context" "io" + "github.com/gogo/protobuf/proto" "github.com/prometheus/common/model" "github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/promql" @@ -125,29 +126,53 @@ func (d *Distributor) queryIngesters(ctx context.Context, replicationSet ring.Re func (d *Distributor) queryIngesterStream(ctx context.Context, replicationSet ring.ReplicationSet, req *client.QueryRequest) ([]client.TimeSeriesChunk, error) { // Fetch samples from multiple ingesters results, err := replicationSet.Do(ctx, d.cfg.ExtraQueryDelay, func(ing *ring.IngesterDesc) (interface{}, error) { - client, err := d.ingesterPool.GetClientFor(ing.Addr) + c, err := d.ingesterPool.GetClientFor(ing.Addr) if err != nil { return nil, err } ingesterQueries.WithLabelValues(ing.Addr).Inc() - stream, err := client.(ingester_client.IngesterClient).QueryStream(ctx, req) - if err != nil { - ingesterQueryFailures.WithLabelValues(ing.Addr).Inc() - return nil, err + errs := make(chan error, d.cfg.QueryStreamShards) + results := make(chan []*client.QueryStreamResponse, d.cfg.QueryStreamShards) + for i := 0; i < d.cfg.QueryStreamShards; i++ { + go func(i int) { + req := proto.Clone(req).(*client.QueryRequest) + req.ShardIndex = uint32(i) + req.TotalShards = uint32(d.cfg.QueryStreamShards) + + stream, err := c.(ingester_client.IngesterClient).QueryStream(ctx, req) + if err != nil { + ingesterQueryFailures.WithLabelValues(ing.Addr).Inc() + errs <- err + return + } + defer stream.CloseSend() + + var result []*ingester_client.QueryStreamResponse + for { + series, err := stream.Recv() + if err == io.EOF { + break + } else if err != nil { + errs <- err + return + } + result = append(result, series) + } + results <- result + }(i) } - defer stream.CloseSend() var result []*ingester_client.QueryStreamResponse - for { - series, err := stream.Recv() - if err == io.EOF { - break - } else if err != nil { + for i := 0; i < d.cfg.QueryStreamShards; i++ { + select { + case r := <-results: + result = append(result, r...) + case err := <-errs: return nil, err } - result = append(result, series) } + return result, nil }) if err != nil { diff --git a/pkg/ingester/client/cortex.proto b/pkg/ingester/client/cortex.proto index 02e17e35b8a..90ecdd69283 100644 --- a/pkg/ingester/client/cortex.proto +++ b/pkg/ingester/client/cortex.proto @@ -47,6 +47,9 @@ message QueryRequest { int64 start_timestamp_ms = 1; int64 end_timestamp_ms = 2; repeated LabelMatcher matchers = 3; + + uint32 shard_index = 4; + uint32 total_shards = 5; } message QueryResponse { diff --git a/pkg/ingester/ingester.go b/pkg/ingester/ingester.go index 05d6b1a1cdd..ef6ea6a5abd 100644 --- a/pkg/ingester/ingester.go +++ b/pkg/ingester/ingester.go @@ -329,7 +329,7 @@ func (i *Ingester) Query(ctx old_ctx.Context, req *client.QueryRequest) (*client result := &client.QueryResponse{} numSeries, numSamples := 0, 0 maxSamplesPerQuery := i.limits.MaxSamplesPerQuery(userID) - err = state.forSeriesMatching(ctx, matchers, func(ctx context.Context, _ model.Fingerprint, series *memorySeries) error { + err = state.forSeriesMatching(ctx, matchers, 0, 0, func(ctx context.Context, _ model.Fingerprint, series *memorySeries) error { values, err := series.samplesForRange(from, through) if err != nil { return err @@ -388,7 +388,7 @@ func (i *Ingester) QueryStream(req *client.QueryRequest, stream client.Ingester_ // can iteratively merge them with entries coming from the chunk store. But // that would involve locking all the series & sorting, so until we have // a better solution in the ingesters I'd rather take the hit in the queriers. - err = state.forSeriesMatching(stream.Context(), matchers, func(ctx context.Context, _ model.Fingerprint, series *memorySeries) error { + err = state.forSeriesMatching(stream.Context(), matchers, req.ShardIndex, req.TotalShards, func(ctx context.Context, _ model.Fingerprint, series *memorySeries) error { numSeries++ chunks := make([]*desc, 0, len(series.chunkDescs)) for _, chunk := range series.chunkDescs { @@ -487,7 +487,7 @@ func (i *Ingester) MetricsForLabelMatchers(ctx old_ctx.Context, req *client.Metr metrics := map[model.Fingerprint]labelPairs{} for _, matchers := range matchersSet { - if err := state.forSeriesMatching(ctx, matchers, func(ctx context.Context, fp model.Fingerprint, series *memorySeries) error { + if err := state.forSeriesMatching(ctx, matchers, 0, 0, func(ctx context.Context, fp model.Fingerprint, series *memorySeries) error { if _, ok := metrics[fp]; !ok { metrics[fp] = client.FromLabelsToLabelPairs(series.metric) } diff --git a/pkg/ingester/user_state.go b/pkg/ingester/user_state.go index 8944ce2e362..eb2200a2432 100644 --- a/pkg/ingester/user_state.go +++ b/pkg/ingester/user_state.go @@ -265,6 +265,7 @@ func (u *userState) removeSeries(fp model.Fingerprint, metric labels.Labels) { // with no locks held, and is intended to be used by the caller to send the // built batches. func (u *userState) forSeriesMatching(ctx context.Context, allMatchers []*labels.Matcher, + shardIndex, totalShards uint32, add func(context.Context, model.Fingerprint, *memorySeries) error, send func(context.Context) error, batchSize int, ) error { @@ -286,6 +287,10 @@ outer: return err } + if totalShards != 0 && uint64(fp)%uint64(totalShards) != uint64(shardIndex) { + continue + } + u.fpLocker.Lock(fp) series, ok := u.fpToSeries.get(fp) if !ok {