pkg/trace: Add client stats aggregator#7875
Conversation
323c893 to
fcb3448
Compare
fcb3448 to
5bb9424
Compare
| Stats: stats, | ||
| }} | ||
| res = append(res, pb.ClientStatsPayload{ | ||
| Hostname: payloadKey.hostname, |
There was a problem hiding this comment.
Does it have an impact that we are missing Lang, TracerVersion, RuntimeID, Sequence?
Are these fields used anywhere?
There was a problem hiding this comment.
No these fields are not currently used or aggregated on.
They lose their meaning on the payload aggregating counts as multiple tracers/versions/langs/runtimeID/sequence could be involved
| } | ||
|
|
||
| // aggregate separately hits, errors, duration | ||
| // Distributions and TopLevelCount will stay on the initial payload |
There was a problem hiding this comment.
Where is toplevel count used?
There was a problem hiding this comment.
It's used in TSA to define primaryName later in the pipeline
| string runtimeID = 7; | ||
| uint64 sequence = 8; | ||
| string lang = 5; // informative field not used for aggregation | ||
| string tracerVersion = 6; // informative field not used for aggregation |
There was a problem hiding this comment.
We should write what they are not where they are used or where they are set. What they specify. What their values represent :)
pkalmakis
left a comment
There was a problem hiding this comment.
I don't see any blockers here, but I'm concerned about the mixed use of int types and time.Time in ClientStatsAggregator. You are converting between the two, and up/down scaling between seconds and nanoseconds in several places. I'm worried about the potential for bugs. We should be able to simplify this.
| case t := <-a.flushTicker.C: | ||
| a.flushOnTime(t) | ||
| case input := <-a.In: | ||
| a.add(time.Now(), input) |
There was a problem hiding this comment.
Testing might be easier if this struct has a field like
clock func() time.Time
That you call instead of explicitly calling time.Now(). The value for this field can be time.Now by default, but you can override in tests.
There was a problem hiding this comment.
I couldn't find how to simplify my tests with the clock and did not change this
There was a problem hiding this comment.
It can be helpful if you want to validate behavior under certain timing conditions.
183ba5c to
cae654a
Compare
cae654a to
09ee8f9
Compare
Introduce span metrics computation to the trace client. ## Motivation Today, metrics computation on traces sent via the `/v0.4/traces` endpoint is performed in the agent. This means that **all** traces need to be sent to the agent in order to have metrics computed _even if the trace is unsampled_. Trace data is big relative to metric data so this means that a lot of unnecessary work is done to encode and transmit traces just to get metrics. Performing metrics computation in the client enables dropping unsampled traces and savings subsequent client and agent processing. Datadog Agent PR introducing support for client stats computation: DataDog/datadog-agent#7875 (released in v7.28.0) ## Implementation Stats computation is done by introducing a new SpanProcessor, `SpanStatsProcessorV06`. This processor handles span finish events, computes the metrics for **measured** spans and periodically (every 10 seconds) flushes the metrics to the agent via the `/v0.6/stats` endpoint. **Measured** spans are spans that are sampled (either automatically or manually) and are 1. Service entry spans - the highest level span for a service. 2. Marked to have metric computed (have the `_dd.measured: 1` tag). ### Enabling The feature is enabled via the `DD_TRACE_COMPUTE_STATS` environment variable or `config._compute_stats`. The feature is disabled by default as we have no endpoint discovery mechanism implemented in the client, otherwise it should be enabled by default. - [x] done ### Indicating that metrics computation has been performed The agent needs to know whether or not to compute metrics for a given payload so that it doesn't compute the metrics again. This is done by sending the `Datadog-Client-Computed-Stats` header with a value of `yes` (it is implicitly `no` in the agent) with requests to the `/v0.4/traces` and `/v0.5/traces` endpoints. ### Aggregation The metrics computed are aggregated. This saves considerable resources both in the client and the backend. The aggregation performed over the following span attributes `name`, `service`, `resource`, `type`, `http.status_code` and `synthetics` (whether or not the span is from synthetics). - [x] done ### Normalization As the metrics are aggregated by the following attributes it is crucial that they effectively slice the data into meaningful, low cardinality segments. For example, if the resource name used is the raw URL of each request of a service that handles `/users/<user_id>` requests then the computed metrics would be aggregated by `/users/1234`, `/users/23`, etc. This isn't useful for users and is expensive to compute. `name`, `service`, `type`, `http.status_code` and `synthetics` don't require normalization as they should already be low cardinality. The one that needs addressing is `resource`. - [x] done*: logic is performed [in the agent](https://cs.github.com/DataDog/datadog-agent/blob/fc26ba4839963979964bd4ade8c0991a75076b33/pkg/trace/agent/agent.go#L370), implementation would be required in order to go agentless. ### Errors The Java client and Datadog agent perform logic to sample all error spans even if they are marked as unsampled by the sampler or user. - [x] done*: opted to not implement this logic for now. ### Computed metrics For each **measured** span the following aggregated metrics are computed: - hits (count): a count of how many times the aggregated span has been created. - top level hits (count): if the aggregated span is top level, how many times it has been created. - duration (count): the cumulative duration of the aggregated span. - ok summary (DDSketch distribution): a distribution of the aggregated span duration, if successful. - error summary (DDSketch distribution): a distribution of the aggregated span duration, if erroneous. ### DDSketch The `ok summary` and `error summary` distributions are [`DDSketch`](http://www.vldb.org/pvldb/vol12/p2195-masson.pdf)s configured as `LogCollapsingLowestDenseDDSketch`s with a relative error rate of **0.775%** and max bin size of **2048**. This configuration matches the implementation used on the backend which avoids the [error introduced](https://docs.google.com/document/d/1zZ-WPXPTcegUDwC9EBAI7cngqf3LGSTCFbqPIjpV-0k/edit#heading=h.50ldylsk3sb2) when converting the sketches. The sketches are serialized to protobuf and included in the msgpack payload as raw bytes which are forwarded by the agent to the backend. - [x] done ### `v0.6/stats` payload The computed metrics are placed into time **buckets**. A bucket is a start time and a duration. Each bucket is 10 seconds in size and metrics are placed into buckets according to the measured span's end time. 10 seconds is chosen arbitrarily but with the rough motivation of timeliness (less is better) and aggregation (more is better). The idea is to aggregate the metrics into buckets and flush them every bucket duration. The payload sent to `v0.6/stats` is a list of buckets. headers: ``` "Datadog-Meta-Lang": "python", "Datadog-Meta-Tracer-Version": ddtrace.__version__, "Content-Type": "application/msgpack", ``` body (msgpack): ``` [ // list of bucket { "Start": ... // start time in ns "Duration": ... // duration in ns "Stats": [ // list of aggregated metrics { "Name": ... // span name "Resource": ... // span resource "Service": ... // span service "Type": ... // span type "Synthetics": ... // if span was generated from synthetics "Hits": ... "TopLevelHits": ... "Duration": ... "Errors": ... "OkSummary": ... // protobuf serialized (bytes) distribution "ErrorSummary": ... // protobuf serialized (bytes) distribution }, ... ] }, ... ] ``` - [x] done ## Testing/verification ### Performance Doing manual performance testing showed that there was negligible improvements to the trace client but substantial improvements to the Datadog Agent. ### System tests/test agent snapshots - [x] Metrics should be serialized properly in msgpack format with the required fields - [x] Metrics should consist of an error count, a hit count (all hits, including the errors), and separate ok and error latency distributions, and the sum of all durations (includes error and ok) for each unique aggregation key. - [ ] Metrics should be reported on a configurable interval. - TODO: is there a specified env variable for this? - Also use this to quicker report stats payloads for system tests - [x] Metrics should be computed for each distinct combination of [service, resource, operation, type, http status code] (non HTTP spans ignore the status code) - Create endpoint that supports custom values for each of the fields - Send variety of requests to test web app and verify the stats - [x] Metrics should be computed for measured spans Confirm: create two span traces with distinct operation names, mark one as measured. Pass these traces through the metrics aggregation system. Verify that no metrics are computed for the unmeasured span’s operation name. Verify that metrics are computed for the measured span operation name. - [x] Metrics should be computed for top level spans - Test endpoint: create two span traces with distinct operation names, mark one as top level. Pass these traces through the metrics aggregation system. Verify that no metrics are computed for the top level span operation name. Verify that metrics are computed for the top level span operation name. - [x] Some metrics should be recorded separately for errors and successful spans Should count error spans, ok latency sketch, error latency sketch separately Total duration, total hit count should include latencies from all spans Confirm: pass traces with a known mix of ok and error spans, verify counts tracked separately - [x] Metrics must be computed before traces can be dropped or sampled. Confirm: this can be verified by inspection, and how this is achieved depends on tracer architecture, but a tracer level test with a trace reporter which always drops can be verified to produce metrics. Test metrics are produced with sample rates set to 0%. - [ ] Metrics must be computed after spans are finished, otherwise components of the aggregation key may change after contribution to aggregates. The latency at each quantile of [p50, p75, p95, p99, max] should be within 1% of the actual latency at each quantile. - Test endpoint: generate 1000 non-error spans in the same bucket with latencies of various distributions, and compute the empirical quantiles from these distributions (so sort the latencies, take the 500th for p50, the 990th for p99, the last for max, and so on) then pass these spans through the aggregator. Force publication of the aggregates, intercept the publication and verify the statistics of the ok latencies sketch. - [ ] Metrics storage should be finite - Test endpoint: present enough distinct combinations of [service, resource, operation, type, http status code] to the aggregator within a reporting interval, and verify that one of them is dropped. The Java tracer uses a LRU eviction policy, so verifies the first metric is dropped. The Java tracer will track up to 1000 buckets. - [ ] Metrics tracking should have low CPU overhead. Confirm: run the tracer with and without metrics enabled for a range of applications in the reliability environment and track the CPU overhead. Aim for at most a 5% overhead over disabled metrics. Metrics should be buffered for short periods of time if the agent is unavailable Confirm: this can easily be simulated with a mock agent which responds slowly and an accelerated reporting interval. It should be possible to disable metrics aggregation Confirm the environment variable DD_TRACE_TRACER_METRICS_ENABLED is used to toggle the feature. - [ ] Obfuscation occurs in the client ### Manual testing Testing notebook: https://app.datadoghq.com/notebook/2036264/client-trace-stats-testing - [x] Metrics reporting should work end to end and be accurate ### Reliability Env - [ ] Metrics storage should have reasonable footprint in RAM - Confirm: construct spans with various cardinalities of combinations of [service, resource, operation, type, http status code] and vary latency distributions to stress sketch size, vary error rates, and use a memory footprint analysis tool (Java has JOL) to measure the size of the aggregates storage in memory. 10MB is a reasonable target to aim for. Metrics should be automatically disabled if an incompatible agent is detected Confirm unit test with a mock which yields 404 for the stats endpoint ## Links and references - Computing stats spec: https://docs.google.com/document/d/1xEJzAW_z4H4slBC4E0rafadX69lHuCHJ43c60eSgWr0/edit#heading=h.aohkev5b3a3m - Context on implementing stats in clients: https://docs.google.com/document/d/10dzyHupAOmHorGubyPTEyyQE6EAyxs1T5f7bzMmpxfk/edit#heading=h.upodtjsy7g56 - Gaps in APM histograms: https://docs.google.com/document/d/1zZ-WPXPTcegUDwC9EBAI7cngqf3LGSTCFbqPIjpV-0k/edit#heading=h.50ldylsk3sb2 - Client stats processing code in the Datadog agent: https://cs.github.com/DataDog/datadog-agent/blob/fc26ba4839963979964bd4ade8c0991a75076b33/pkg/trace/agent/agent.go#L353 ## Follow up - [ ] Disable retrying in trace writer when stats computation is enabled. - [ ] Obfuscation: 1.x...Kyle-Verhoog:obfuscators - [ ] Always sampling errors. - [ ] Enable stats computation by default in 1.0 when an agent minimum version is defined.
Aggregate hits, errors, durations on 2s buckets in the Agent, guaranteeing one value per agent hostname per second. The buckets are not aligned with concentrator buckets. Distributions from the tracer are left untouched and get aggregated in the backend.
Stats generated by tracer payloads will have 3 cases:
(no collision) 1 tracer for 1 Agent. We enrich and forward the tracer’s stats payload.
(collision) Payload with histograms will be tracer payloads with the modifications: Hits, Errors, Duration set to 0. This 3 values could be recovered from the histograms
(collision) Payload with just Hits, Errors, Duration counts aggregated will have no histograms.