diff --git a/CHANGELOG.md b/CHANGELOG.md index 44ddb10a60..f110486d74 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -27,6 +27,7 @@ * [ENHANCEMENT] Compactor: Avoid double compaction by not filtering delete blocks on real time when using bucketIndex lister. #7156 * [ENHANCEMENT] Upgrade to go 1.25. #7164 * [ENHANCEMENT] Upgraded container base images to `alpine:3.23`. #7163 +* [ENHANCEMENT] Instrument Ingester CPU profile with userID for read APIs. #7184 * [BUGFIX] Ring: Change DynamoDB KV to retry indefinitely for WatchKey. #7088 * [BUGFIX] Ruler: Add XFunctions validation support. #7111 * [BUGFIX] Querier: propagate Prometheus info annotations in protobuf responses. #7132 diff --git a/pkg/ingester/ingester.go b/pkg/ingester/ingester.go index acb8bac27a..adbb136f15 100644 --- a/pkg/ingester/ingester.go +++ b/pkg/ingester/ingester.go @@ -11,6 +11,7 @@ import ( "os" "path/filepath" "runtime" + "runtime/pprof" "slices" "strings" "sync" @@ -1780,6 +1781,14 @@ func (i *Ingester) QueryExemplars(ctx context.Context, req *client.ExemplarQuery return nil, err } + // Set pprof labels for profiling + pprof.Do(ctx, pprof.Labels("user", userID), func(ctx context.Context) { + resp, err = i.queryExemplars(ctx, userID, req) + }) + return resp, err +} + +func (i *Ingester) queryExemplars(ctx context.Context, userID string, req *client.ExemplarQueryRequest) (*client.ExemplarQueryResponse, error) { from, through, matchers, err := client.FromExemplarQueryRequest(i.matchersCache, req) if err != nil { return nil, err @@ -1836,33 +1845,55 @@ func (i *Ingester) QueryExemplars(ctx context.Context, req *client.ExemplarQuery // LabelValues returns all label values that are associated with a given label name. func (i *Ingester) LabelValues(ctx context.Context, req *client.LabelValuesRequest) (resp *client.LabelValuesResponse, err error) { defer recoverIngester(i.logger, &err) - resp, cleanup, err := i.labelsValuesCommon(ctx, req) - defer cleanup() + + userID, userErr := users.TenantID(ctx) + if userErr != nil { + return nil, userErr + } + + // Set pprof labels for profiling + pprof.Do(ctx, pprof.Labels("user", userID), func(ctx context.Context) { + var cleanup func() + resp, cleanup, err = i.labelsValuesCommon(ctx, req) + defer cleanup() + }) return resp, err } // LabelValuesStream returns all label values that are associated with a given label name. func (i *Ingester) LabelValuesStream(req *client.LabelValuesRequest, stream client.Ingester_LabelValuesStreamServer) (err error) { defer recoverIngester(i.logger, &err) - resp, cleanup, err := i.labelsValuesCommon(stream.Context(), req) - defer cleanup() - if err != nil { - return err + ctx := stream.Context() + userID, userErr := users.TenantID(ctx) + if userErr != nil { + return userErr } - for i := 0; i < len(resp.LabelValues); i += metadataStreamBatchSize { - j := min(i+metadataStreamBatchSize, len(resp.LabelValues)) - resp := &client.LabelValuesStreamResponse{ - LabelValues: resp.LabelValues[i:j], - } - err := client.SendLabelValuesStream(stream, resp) + // Set pprof labels for profiling + pprof.Do(ctx, pprof.Labels("user", userID), func(ctx context.Context) { + var resp *client.LabelValuesResponse + var cleanup func() + resp, cleanup, err = i.labelsValuesCommon(ctx, req) + defer cleanup() + if err != nil { - return err + return } - } - return nil + for i := 0; i < len(resp.LabelValues); i += metadataStreamBatchSize { + j := min(i+metadataStreamBatchSize, len(resp.LabelValues)) + resp := &client.LabelValuesStreamResponse{ + LabelValues: resp.LabelValues[i:j], + } + err = client.SendLabelValuesStream(stream, resp) + if err != nil { + return + } + } + }) + + return err } // labelsValuesCommon returns all label values that are associated with a given label name. @@ -1930,33 +1961,55 @@ func (i *Ingester) labelsValuesCommon(ctx context.Context, req *client.LabelValu // LabelNames return all the label names. func (i *Ingester) LabelNames(ctx context.Context, req *client.LabelNamesRequest) (resp *client.LabelNamesResponse, err error) { defer recoverIngester(i.logger, &err) - resp, cleanup, err := i.labelNamesCommon(ctx, req) - defer cleanup() + + userID, userErr := users.TenantID(ctx) + if userErr != nil { + return nil, userErr + } + + // Set pprof labels for profiling + pprof.Do(ctx, pprof.Labels("user", userID), func(ctx context.Context) { + var cleanup func() + resp, cleanup, err = i.labelNamesCommon(ctx, req) + defer cleanup() + }) return resp, err } // LabelNamesStream return all the label names. func (i *Ingester) LabelNamesStream(req *client.LabelNamesRequest, stream client.Ingester_LabelNamesStreamServer) (err error) { defer recoverIngester(i.logger, &err) - resp, cleanup, err := i.labelNamesCommon(stream.Context(), req) - defer cleanup() - if err != nil { - return err + ctx := stream.Context() + userID, userErr := users.TenantID(ctx) + if userErr != nil { + return userErr } - for i := 0; i < len(resp.LabelNames); i += metadataStreamBatchSize { - j := min(i+metadataStreamBatchSize, len(resp.LabelNames)) - resp := &client.LabelNamesStreamResponse{ - LabelNames: resp.LabelNames[i:j], - } - err = client.SendLabelNamesStream(stream, resp) + // Set pprof labels for profiling + pprof.Do(ctx, pprof.Labels("user", userID), func(ctx context.Context) { + var resp *client.LabelNamesResponse + var cleanup func() + resp, cleanup, err = i.labelNamesCommon(ctx, req) + defer cleanup() + if err != nil { - return err + return } - } - return nil + for i := 0; i < len(resp.LabelNames); i += metadataStreamBatchSize { + j := min(i+metadataStreamBatchSize, len(resp.LabelNames)) + resp := &client.LabelNamesStreamResponse{ + LabelNames: resp.LabelNames[i:j], + } + err = client.SendLabelNamesStream(stream, resp) + if err != nil { + return + } + } + }) + + return err } // labelNamesCommon return all the label names. @@ -2024,49 +2077,70 @@ func (i *Ingester) labelNamesCommon(ctx context.Context, req *client.LabelNamesR // MetricsForLabelMatchers returns all the metrics which match a set of matchers. func (i *Ingester) MetricsForLabelMatchers(ctx context.Context, req *client.MetricsForLabelMatchersRequest) (result *client.MetricsForLabelMatchersResponse, err error) { defer recoverIngester(i.logger, &err) - result = &client.MetricsForLabelMatchersResponse{} - cleanup, err := i.metricsForLabelMatchersCommon(ctx, req, func(l labels.Labels) error { - result.Metric = append(result.Metric, &cortexpb.Metric{ - Labels: cortexpb.FromLabelsToLabelAdapters(l), + + userID, userErr := users.TenantID(ctx) + if userErr != nil { + return nil, userErr + } + + // Set pprof labels for profiling + pprof.Do(ctx, pprof.Labels("user", userID), func(ctx context.Context) { + result = &client.MetricsForLabelMatchersResponse{} + var cleanup func() + cleanup, err = i.metricsForLabelMatchersCommon(ctx, req, func(l labels.Labels) error { + result.Metric = append(result.Metric, &cortexpb.Metric{ + Labels: cortexpb.FromLabelsToLabelAdapters(l), + }) + return nil }) - return nil + defer cleanup() }) - defer cleanup() return result, err } func (i *Ingester) MetricsForLabelMatchersStream(req *client.MetricsForLabelMatchersRequest, stream client.Ingester_MetricsForLabelMatchersStreamServer) (err error) { defer recoverIngester(i.logger, &err) - result := &client.MetricsForLabelMatchersStreamResponse{} - cleanup, err := i.metricsForLabelMatchersCommon(stream.Context(), req, func(l labels.Labels) error { - result.Metric = append(result.Metric, &cortexpb.Metric{ - Labels: cortexpb.FromLabelsToLabelAdapters(l), + ctx := stream.Context() + userID, userErr := users.TenantID(ctx) + if userErr != nil { + return userErr + } + + // Set pprof labels for profiling + pprof.Do(ctx, pprof.Labels("user", userID), func(ctx context.Context) { + result := &client.MetricsForLabelMatchersStreamResponse{} + + var cleanup func() + cleanup, err = i.metricsForLabelMatchersCommon(ctx, req, func(l labels.Labels) error { + result.Metric = append(result.Metric, &cortexpb.Metric{ + Labels: cortexpb.FromLabelsToLabelAdapters(l), + }) + + if len(result.Metric) >= metadataStreamBatchSize { + err := client.SendMetricsForLabelMatchersStream(stream, result) + if err != nil { + return err + } + result.Metric = result.Metric[:0] + } + return nil }) + defer cleanup() + if err != nil { + return + } - if len(result.Metric) >= metadataStreamBatchSize { - err := client.SendMetricsForLabelMatchersStream(stream, result) + // Send last batch + if len(result.Metric) > 0 { + err = client.SendMetricsForLabelMatchersStream(stream, result) if err != nil { - return err + return } - result.Metric = result.Metric[:0] } - return nil }) - defer cleanup() - if err != nil { - return err - } - - // Send last batch - if len(result.Metric) > 0 { - err = client.SendMetricsForLabelMatchersStream(stream, result) - if err != nil { - return err - } - } - return nil + return err } // metricsForLabelMatchersCommon returns all the metrics which match a set of matchers. @@ -2160,7 +2234,7 @@ func (i *Ingester) metricsForLabelMatchersCommon(ctx context.Context, req *clien } // MetricsMetadata returns all the metric metadata of a user. -func (i *Ingester) MetricsMetadata(ctx context.Context, req *client.MetricsMetadataRequest) (*client.MetricsMetadataResponse, error) { +func (i *Ingester) MetricsMetadata(ctx context.Context, req *client.MetricsMetadataRequest) (resp *client.MetricsMetadataResponse, err error) { i.stoppedMtx.RLock() if err := i.checkRunningOrStopping(); err != nil { i.stoppedMtx.RUnlock() @@ -2173,13 +2247,19 @@ func (i *Ingester) MetricsMetadata(ctx context.Context, req *client.MetricsMetad return nil, err } - userMetadata := i.getUserMetadata(userID) + // Set pprof labels for profiling + pprof.Do(ctx, pprof.Labels("user", userID), func(ctx context.Context) { + userMetadata := i.getUserMetadata(userID) - if userMetadata == nil { - return &client.MetricsMetadataResponse{}, nil - } + if userMetadata == nil { + resp = &client.MetricsMetadataResponse{} + return + } + + resp = &client.MetricsMetadataResponse{Metadata: userMetadata.toClientMetadata(req)} + }) - return &client.MetricsMetadataResponse{Metadata: userMetadata.toClientMetadata(req)}, nil + return resp, nil } // CheckReady is the readiness handler used to indicate to k8s when the ingesters @@ -2309,6 +2389,15 @@ func (i *Ingester) QueryStream(req *client.QueryRequest, stream client.Ingester_ return err } + // Set pprof labels for profiling + pprof.Do(ctx, pprof.Labels("user", userID), func(ctx context.Context) { + err = i.queryStream(ctx, userID, req, stream, spanlog) + }) + + return err +} + +func (i *Ingester) queryStream(ctx context.Context, userID string, req *client.QueryRequest, stream client.Ingester_QueryStreamServer, spanlog *spanlogger.SpanLogger) error { from, through, matchers, err := client.FromQueryRequest(i.matchersCache, req) if err != nil { return err