Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,8 @@ type Config struct {
ExtraQueryDelay time.Duration
LimiterReloadPeriod time.Duration

ShardByAllLabels bool
ShardByAllLabels bool
QueryStreamShards int

// for testing
ingesterClientFactory client.Factory
Expand All @@ -120,6 +121,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
f.DurationVar(&cfg.ExtraQueryDelay, "distributor.extra-query-delay", 0, "Time to wait before sending more than the minimum successful query requests.")
f.DurationVar(&cfg.LimiterReloadPeriod, "distributor.limiter-reload-period", 5*time.Minute, "Period at which to reload user ingestion limits.")
f.BoolVar(&cfg.ShardByAllLabels, "distributor.shard-by-all-labels", false, "Distribute samples based on all labels, as opposed to solely by user and metric name.")
f.IntVar(&cfg.QueryStreamShards, "distributor.query-stream-shards", 1, "Number of parallel calls to use for queries to the ingesters.")
}

// New constructs a new Distributor
Expand Down
49 changes: 37 additions & 12 deletions pkg/distributor/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"io"

"github.com/gogo/protobuf/proto"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/promql"
Expand Down Expand Up @@ -125,29 +126,53 @@ func (d *Distributor) queryIngesters(ctx context.Context, replicationSet ring.Re
func (d *Distributor) queryIngesterStream(ctx context.Context, replicationSet ring.ReplicationSet, req *client.QueryRequest) ([]client.TimeSeriesChunk, error) {
// Fetch samples from multiple ingesters
results, err := replicationSet.Do(ctx, d.cfg.ExtraQueryDelay, func(ing *ring.IngesterDesc) (interface{}, error) {
client, err := d.ingesterPool.GetClientFor(ing.Addr)
c, err := d.ingesterPool.GetClientFor(ing.Addr)
if err != nil {
return nil, err
}
ingesterQueries.WithLabelValues(ing.Addr).Inc()

stream, err := client.(ingester_client.IngesterClient).QueryStream(ctx, req)
if err != nil {
ingesterQueryFailures.WithLabelValues(ing.Addr).Inc()
return nil, err
errs := make(chan error, d.cfg.QueryStreamShards)
results := make(chan []*client.QueryStreamResponse, d.cfg.QueryStreamShards)
for i := 0; i < d.cfg.QueryStreamShards; i++ {
go func(i int) {
req := proto.Clone(req).(*client.QueryRequest)
req.ShardIndex = uint32(i)
req.TotalShards = uint32(d.cfg.QueryStreamShards)

stream, err := c.(ingester_client.IngesterClient).QueryStream(ctx, req)
if err != nil {
ingesterQueryFailures.WithLabelValues(ing.Addr).Inc()
errs <- err
return
}
defer stream.CloseSend()

var result []*ingester_client.QueryStreamResponse
for {
series, err := stream.Recv()
if err == io.EOF {
break
} else if err != nil {
errs <- err
return
}
result = append(result, series)
}
results <- result
}(i)
}
defer stream.CloseSend()

var result []*ingester_client.QueryStreamResponse
for {
series, err := stream.Recv()
if err == io.EOF {
break
} else if err != nil {
for i := 0; i < d.cfg.QueryStreamShards; i++ {
select {
case r := <-results:
result = append(result, r...)
case err := <-errs:
return nil, err
}
result = append(result, series)
}

return result, nil
})
if err != nil {
Expand Down
3 changes: 3 additions & 0 deletions pkg/ingester/client/cortex.proto
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,9 @@ message QueryRequest {
int64 start_timestamp_ms = 1;
int64 end_timestamp_ms = 2;
repeated LabelMatcher matchers = 3;

uint32 shard_index = 4;
uint32 total_shards = 5;
}

message QueryResponse {
Expand Down
6 changes: 3 additions & 3 deletions pkg/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -329,7 +329,7 @@ func (i *Ingester) Query(ctx old_ctx.Context, req *client.QueryRequest) (*client
result := &client.QueryResponse{}
numSeries, numSamples := 0, 0
maxSamplesPerQuery := i.limits.MaxSamplesPerQuery(userID)
err = state.forSeriesMatching(ctx, matchers, func(ctx context.Context, _ model.Fingerprint, series *memorySeries) error {
err = state.forSeriesMatching(ctx, matchers, 0, 0, func(ctx context.Context, _ model.Fingerprint, series *memorySeries) error {
values, err := series.samplesForRange(from, through)
if err != nil {
return err
Expand Down Expand Up @@ -388,7 +388,7 @@ func (i *Ingester) QueryStream(req *client.QueryRequest, stream client.Ingester_
// can iteratively merge them with entries coming from the chunk store. But
// that would involve locking all the series & sorting, so until we have
// a better solution in the ingesters I'd rather take the hit in the queriers.
err = state.forSeriesMatching(stream.Context(), matchers, func(ctx context.Context, _ model.Fingerprint, series *memorySeries) error {
err = state.forSeriesMatching(stream.Context(), matchers, req.ShardIndex, req.TotalShards, func(ctx context.Context, _ model.Fingerprint, series *memorySeries) error {
numSeries++
chunks := make([]*desc, 0, len(series.chunkDescs))
for _, chunk := range series.chunkDescs {
Expand Down Expand Up @@ -487,7 +487,7 @@ func (i *Ingester) MetricsForLabelMatchers(ctx old_ctx.Context, req *client.Metr

metrics := map[model.Fingerprint]labelPairs{}
for _, matchers := range matchersSet {
if err := state.forSeriesMatching(ctx, matchers, func(ctx context.Context, fp model.Fingerprint, series *memorySeries) error {
if err := state.forSeriesMatching(ctx, matchers, 0, 0, func(ctx context.Context, fp model.Fingerprint, series *memorySeries) error {
if _, ok := metrics[fp]; !ok {
metrics[fp] = client.FromLabelsToLabelPairs(series.metric)
}
Expand Down
5 changes: 5 additions & 0 deletions pkg/ingester/user_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,7 @@ func (u *userState) removeSeries(fp model.Fingerprint, metric labels.Labels) {
// with no locks held, and is intended to be used by the caller to send the
// built batches.
func (u *userState) forSeriesMatching(ctx context.Context, allMatchers []*labels.Matcher,
shardIndex, totalShards uint32,
add func(context.Context, model.Fingerprint, *memorySeries) error,
send func(context.Context) error, batchSize int,
) error {
Expand All @@ -286,6 +287,10 @@ outer:
return err
}

if totalShards != 0 && uint64(fp)%uint64(totalShards) != uint64(shardIndex) {
continue
}

u.fpLocker.Lock(fp)
series, ok := u.fpToSeries.get(fp)
if !ok {
Expand Down