Conversation
| debug!("Failed to send context spans to agent: {e}"); | ||
| } | ||
| } else { | ||
| error!("Failed to process traces, skipping send"); |
There was a problem hiding this comment.
Processor won't send spans to TraceAggregator
There was a problem hiding this comment.
can't we just avoid the extra allocation by doing if let Some(send_data) = trace_processor... else {} ?
| if let Some(req) = self.create_request(batch.clone()).await { | ||
| set.spawn(async move { Self::send(req).await }); | ||
| } else { | ||
| error!("Failed to create request"); |
There was a problem hiding this comment.
Flusher won't create HTTP requests to send to data to Datadog at /api/v2/logs
| } | ||
| } | ||
| } else { | ||
| error!("Failed to process traces, skipping send"); |
There was a problem hiding this comment.
OLTP Agent won't send traces to TraceFlusher
| } | ||
| }; | ||
| } else { | ||
| error!("Failed to create endpoint"); |
There was a problem hiding this comment.
ServerlessStatsFlusher won't send stats to Datadog's endpoint.
| ), | ||
| } | ||
| } else { | ||
| error!("Failed to process traces, skipping send"); |
There was a problem hiding this comment.
TraceAgent won't send traces to TraceFlusher
| ), | ||
| } | ||
| } else { | ||
| error_response( |
There was a problem hiding this comment.
TraceAgent proxy won't send data to Datadog
| )); | ||
| Some(send_data) | ||
| } else { | ||
| error!("Failed to resolve API key"); |
There was a problem hiding this comment.
TraceProcessor won't process traces
There was a problem hiding this comment.
Pull Request Overview
This PR enhances resilience by turning API key resolution failures into no-ops instead of crashing, allowing the extension to continue running. Key changes include:
- Converting
process_tracesto returnOption<SendData>and guarding all flush/send paths across multiple components. - Adding
if let Somechecks around API key resolution in trace, stats, logs, OTLP, and invocation processors. - Updating the
dogstatsddependency revision in Cargo.toml.
Reviewed Changes
Copilot reviewed 7 out of 9 changed files in this pull request and generated 2 comments.
Show a summary per file
| File | Description |
|---|---|
| trace_processor.rs | Changed return type to Option<SendData> and added if let Some(api_key) guard around endpoint construction. |
| trace_agent.rs | Added initial None check for send_data and unified error responses when API key or send_data is missing. |
| stats_flusher.rs | Changed endpoint cell to OnceCell<Option<Endpoint>> and wrapped stats send logic in if let Some for API key and endpoint. |
| otlp/agent.rs | Wrapped process_traces result in if let Some(send_data) to skip sending when API key resolution fails. |
| logs/flusher.rs | Changed cached headers to OnceCell<Option<HeaderMap>> and made create_request return Option<…> to skip sends. |
| lifecycle/invocation/processor.rs | Updated invocation processor to skip sending when process_traces returns None. |
| Cargo.toml | Bumped dogstatsd revision to 0add16260cca1ec01729a3d99f5a40cf246a2c38. |
Comments suppressed due to low confidence (2)
bottlecap/src/traces/trace_processor.rs:170
- The call to
to_string().clone()is redundant;to_string()already returns aString. You can simplify toapi_key: Some(api_key.into()),.
api_key: Some(api_key.to_string().into()),
bottlecap/src/traces/trace_processor.rs:130
- Consider adding a unit test for the new
Nonereturn path when API key resolution fails, to ensure thatprocess_tracescorrectly returnsNoneand skips sending.
) -> Option<SendData>;
| } else { | ||
| error!("Failed to process traces, skipping send"); | ||
| error_response( | ||
| StatusCode::INTERNAL_SERVER_ERROR, | ||
| format!("Error sending traces to the trace flusher: {err}"), | ||
| ), | ||
| "Failed to process traces, skipping send", | ||
| ) |
There was a problem hiding this comment.
This else branch is unreachable because you return above when send_data is None. Consider removing the second if let/else and unifying error handling for clarity.
| } else { | |
| error!("Failed to process traces, skipping send"); | |
| error_response( | |
| StatusCode::INTERNAL_SERVER_ERROR, | |
| format!("Error sending traces to the trace flusher: {err}"), | |
| ), | |
| "Failed to process traces, skipping send", | |
| ) |
| Some(Endpoint { | ||
| url: hyper::Uri::from_str(&stats_url) | ||
| .expect("can't make URI from stats url, exiting"), | ||
| api_key: Some(api_key.to_string().clone().into()), |
There was a problem hiding this comment.
Similar to the other location, the .clone() on the result of to_string() is redundant. Use api_key.into() to simplify.
| api_key: Some(api_key.to_string().clone().into()), | |
| api_key: Some(api_key.into()), |
1b2e85f to
b60bd54
Compare
| if let Some(req) = self.create_request(batch.clone()).await { | ||
| set.spawn(async move { Self::send(req).await }); | ||
| } else { | ||
| error!("Failed to create request"); | ||
| continue; | ||
| } |
There was a problem hiding this comment.
let-else is the normal way to avoid extra indentation
| if let Some(req) = self.create_request(batch.clone()).await { | |
| set.spawn(async move { Self::send(req).await }); | |
| } else { | |
| error!("Failed to create request"); | |
| continue; | |
| } | |
| let Some(req) = self.create_request(batch.clone()).await else { | |
| error!("Failed to create request"); | |
| continue; | |
| } | |
| set.spawn(async move { Self::send(req).await }); |
There was a problem hiding this comment.
That's the perfect answer!!
cec3247 to
727d04f
Compare
e64164e to
29a299d
Compare
| let headers = self.get_headers().await; | ||
| self.client | ||
| let Some(headers) = self.get_headers().await else { | ||
| return None; |
There was a problem hiding this comment.
Mmm, although I understand the code, I find it a little confusing that get_headers is responsible for deciding wether or not we're creating a request.
Would it make more sense to rearchitect this so that whenever we definitely know we are about to flush, let's say in flush() method, we try to get the API Key?
duncanista
left a comment
There was a problem hiding this comment.
Left a comment which I would like to see if we can work around. The main idea is, could we rearchitect so that whenever we hit flush we try to resolve the API key and then start doing later work based on it? Instead, we're failing in headers when trying to get an API key, but this looks like they should be separated 🤔
LMK what you think
66676fe to
d10fb07
Compare
| /// These tags are used to capture runtime and initialization. | ||
| dynamic_tags: HashMap<String, String>, | ||
| /// Function to resolve Datadog API key. | ||
| api_key_factory: Arc<ApiKeyFactory>, |
There was a problem hiding this comment.
Add it to an outer struct Processor
| trace_processor: &Arc<dyn TraceProcessor + Send + Sync>, | ||
| trace_agent_tx: &Sender<SendData>, | ||
| ) { | ||
| let Some(api_key) = self.api_key_factory.get_api_key().await else { |
There was a problem hiding this comment.
... so we can abort earlier here, without needing to touch many functions
| pub async fn flush(&self, batches: Option<Arc<Vec<Vec<u8>>>>) -> Vec<reqwest::RequestBuilder> { | ||
| let mut set = JoinSet::new(); | ||
| let api_key = self.api_key_factory.get_api_key().await; | ||
| let Some(api_key) = api_key else { |
There was a problem hiding this comment.
Abort early at the beginning of Flusher.flush()
| vec![traces], | ||
| body_size, | ||
| self.inferrer.span_pointers.clone(), | ||
| api_key, |
There was a problem hiding this comment.
Passing api_key to process_traces(), so process_traces() won't need to handle failure inside.
| } | ||
|
|
||
| async fn get_headers(&self) -> &HeaderMap { | ||
| async fn get_headers(&self, api_key: &str) -> &HeaderMap { |
There was a problem hiding this comment.
Passing in api_key, so get_headers() won't need to handle the failure
| OtlpProcessor, | ||
| Arc<dyn TraceProcessor + Send + Sync>, | ||
| Sender<SendData>, | ||
| Arc<ApiKeyFactory>, |
There was a problem hiding this comment.
Adding it to AgentState and Agent
| State((config, tags_provider, processor, trace_processor, trace_tx, api_key_factory)): State<AgentState>, | ||
| request: Request, | ||
| ) -> Response { | ||
| let Some(api_key) = api_key_factory.get_api_key().await else { |
There was a problem hiding this comment.
Abort at the beginning of v1_traces API handler
There was a problem hiding this comment.
Is it possible that the customer's code calls /v1/traces api synchronously, and we slow down the customer's Lambda by doing the heavy operation of resolving api key here?
If so, it might be better to further defer key resolution by moving it out of the API handler.
There was a problem hiding this comment.
This gets called by an exporter, probably at the end of the function, so yeah, it would be done in runtime time
| return; | ||
| } | ||
|
|
||
| let Some(api_key) = self.api_key_factory.get_api_key().await else { |
There was a problem hiding this comment.
Abort at the beginning of StatsFlusher.send()
| version: ApiVersion, | ||
| api_key_factory: Arc<ApiKeyFactory>, | ||
| ) -> Response { | ||
| let Some(api_key) = api_key_factory.get_api_key().await else { |
There was a problem hiding this comment.
Abort at the beginning of v0.4 and v0.5 traces API handler
There was a problem hiding this comment.
dit: Is it possible that the customer's code calls traces api synchronously, and we slow down the customer's Lambda by doing the heavy operation of resolving api key here?
If so, it might be better to further defer key resolution by moving it out of the API handler.
There was a problem hiding this comment.
yeah we need to defer this to the flusher as this API is called synchronously
| Err(e) => return error_response(StatusCode::INTERNAL_SERVER_ERROR, e), | ||
| }; | ||
|
|
||
| let Some(api_key) = api_key_factory.get_api_key().await else { |
There was a problem hiding this comment.
Abort at the beginning of handle_proxy()
@duncanista Good point! Made a lot of changes. One concern is this PR (and the last one) only defers key resolution from init time to trace API handler (if trace API handler is called), not to flush time. Although it can shorten cold start time, it can make |
| traces::trace_processor::TraceProcessor, | ||
| }; | ||
|
|
||
| use dogstatsd::api_key::ApiKeyFactory; |
There was a problem hiding this comment.
Question, do we want to move this to its own file? Wondering if all other components should be relying in dogstatsd as a dependency just for an ApiKeyFactory
There was a problem hiding this comment.
Not needed to be done now, but would be good to not make them dependent on a metrics module
3be2928 to
e820be0
Compare
4ff6353 to
aaeefdd
Compare
| if self.config.proxy_https.is_some() { | ||
| let site_in_no_proxy = std::env::var("NO_PROXY") | ||
| .map_or(false, |no_proxy| no_proxy.contains(&self.config.site)) | ||
| .is_ok_and(|no_proxy| no_proxy.contains(&self.config.site)) |
There was a problem hiding this comment.
fixes a new clippy error due to upgrade
| struct KeyValueVisitor; | ||
|
|
||
| impl<'de> serde::de::Visitor<'de> for KeyValueVisitor { | ||
| impl serde::de::Visitor<'_> for KeyValueVisitor { |
There was a problem hiding this comment.
fixes a new clippy error due to upgrade
| use datadog_trace_protobuf::pb::ClientStatsPayload; | ||
| use std::collections::VecDeque; | ||
|
|
||
| #[allow(clippy::empty_line_after_doc_comments)] |
| pub struct ProxyState { | ||
| pub config: Arc<config::Config>, | ||
| pub proxy_aggregator: Arc<Mutex<proxy_aggregator::Aggregator>>, | ||
| pub api_key_factory: Arc<ApiKeyFactory>, |
There was a problem hiding this comment.
Moved to trace_flusher.
| let mut traces = guard.get_batch(); | ||
| // Lazily set the API key | ||
| for trace in &mut traces { | ||
| trace.get_target_mut().api_key = Some(api_key.to_string().into()); |
There was a problem hiding this comment.
I need to add get_target_mut() in DataDog/libdatadog#1140
…745) # Background Right now `SendData` is passed around across channels. # This PR Instead of passing `SendData`, pass `SendDataBuilderInfo`, which bundles `SendDataBuilder` and payload size. Just before flush, call `SendDataBuilder.build()` to build `SendData`. # Motivation DataDog/libdatadog#1140 (comment) It is suggested that the function `set_api_key()` shouldn't be added on `SendData`, but should be added on `SendDataBuilder`. Because need to call `set_api_key()` just before flush, we need to make sure the object is `SendDataBuilder` instead of `SendData` until flush time. And because we need payload size in Trace Aggregator, and `SendDataBuilder` doesn't expose this field, we need to pass it explicitly along with `SendDataBuilder`. # Next steps Update #717 #732 so that `get_api_key()` is called just before flush. # Dependency DataDog/libdatadog#1140
2ef2a9a to
37caca4
Compare
7397582 to
43030bf
Compare
37caca4 to
ef63759
Compare
13449b8 to
d10e087
Compare
# Motivation From @astuyve: > today we basically block/await on that decrypt call before we can call /next so if we can instead make that async and then resolve the future only when we need to flush data, that can be a big win for many customers. https://datadoghq.atlassian.net/browse/SVLS-6995 # Previous work DataDog/serverless-components#21, DataDog/serverless-components#24 created `ApiKeyFactory`, which is a util to enable lazy API key resolution. # This PR Updates Bottlecap code to use `ApiKeyFactory` to lazily resolve API key, i.e. instead of resolving it by querying Secret Manager or KMS during init phase, do it at flushing time when api key is actually needed. # Note This PR changes the behavior when key resolution fails, i.e. when `resolve_secrets()` returns `None`. - Before: run `extension_loop_idle()`, which does not stop the runtime - After: panic, which will stop the runtime (if I understand correctly). Of course it's not ideal. Any better idea? - It's harder now to run `extension_loop_idle()` because api key resolution code is not in the main loop anymore, but in various consumer code of api key - Is there a way to gracefully shut down the extension without affecting the runtime? Update: Added a PR to address resolution failure: #732 These two PRs should be merged together. Keeping them separate PRs just to make review easier. # Testing ## Setup - Runtime: Go1 on Amazon Linux 2 - Architecture: arm64 - An app with empty implementation code ## Result Below is the `Datadog Next-Gen Extension ready in:` time logged. - Before: (prod extension `arn:aws:lambda:us-east-1:464622532012:layer:Datadog-Extension-ARM:82`) - 88.6 ± 1.8 (ms) - After: (test extension `arn:aws:lambda:us-east-1:425362996713:layer:Datadog-Bottlecap-Beta-ARM-yiming:2`) - 35.4 ± 5.1 (ms) - (-60.0%) <img width="461" alt="image" src="https://github.com/user-attachments/assets/b2973aae-d8f2-4003-a37f-6af05a42e059" /> Both use 5 samples. # Notes https://datadoghq.atlassian.net/issues/SVLS-6996 https://datadoghq.atlassian.net/issues/SVLS-6998
Simplify logic in StatsFlusher Move api_key_factory out of TraceProcessor Move some code Avoid resolving key in trace api and proxy Apply to proxy flusher Resolve conflicts Make trace flusher resolve api key Fix Clippy lint Format Use SendData.set_api_key() Fix errors Improve comments
7fe772c to
5ebd135
Compare

Context
The previous PR #717 defers API key resolution from extension init stage to flush time. However, that PR doesn't well handle the failure case.
What does this PR do?
TraceProcessor.process_traces()toTraceFlusher.flush().Dependencies
Manual Test
Steps
DD_API_KEY_SECRET_ARNto an invalid valueDD_API_KEY_SECRET_ARNto a valid valueResult
Automated Test
I didn't add any automated test because from what I see in the codebase, existing tests are usually unit tests for short functions and not for long functions that this PR touches. Please let me know if you think I should add automated tests.