diff --git a/bottlecap/Cargo.lock b/bottlecap/Cargo.lock index c330817f9..61cbaf178 100644 --- a/bottlecap/Cargo.lock +++ b/bottlecap/Cargo.lock @@ -492,7 +492,7 @@ dependencies = [ "base64 0.22.1", "bytes 1.10.1", "chrono", - "datadog-fips 0.1.0 (git+https://github.com/DataDog/serverless-components?rev=3f3347790d4c7f3393589ef7efb7ccf22405a89a)", + "datadog-fips", "datadog-protos 0.1.0 (git+https://github.com/DataDog/saluki/?rev=c89b58e5784b985819baf11f13f7d35876741222)", "datadog-trace-agent", "datadog-trace-normalization", @@ -723,18 +723,7 @@ dependencies = [ [[package]] name = "datadog-fips" version = "0.1.0" -source = "git+https://github.com/DataDog/serverless-components?rev=3f3347790d4c7f3393589ef7efb7ccf22405a89a#3f3347790d4c7f3393589ef7efb7ccf22405a89a" -dependencies = [ - "reqwest", - "rustls", - "rustls-native-certs", - "tracing", -] - -[[package]] -name = "datadog-fips" -version = "0.1.0" -source = "git+https://github.com/DataDog/serverless-components?rev=b1583da00e202985057cf6d48f05e1cac77ab910#b1583da00e202985057cf6d48f05e1cac77ab910" +source = "git+https://github.com/DataDog/serverless-components?rev=c3d8ed4f90591c6958921145d485463860307f78#c3d8ed4f90591c6958921145d485463860307f78" dependencies = [ "reqwest", "rustls", @@ -771,7 +760,7 @@ dependencies = [ [[package]] name = "datadog-trace-agent" version = "0.1.0" -source = "git+https://github.com/DataDog/serverless-components?rev=3f3347790d4c7f3393589ef7efb7ccf22405a89a#3f3347790d4c7f3393589ef7efb7ccf22405a89a" +source = "git+https://github.com/DataDog/serverless-components?rev=c3d8ed4f90591c6958921145d485463860307f78#c3d8ed4f90591c6958921145d485463860307f78" dependencies = [ "anyhow", "async-trait", @@ -980,9 +969,9 @@ dependencies = [ [[package]] name = "dogstatsd" version = "0.1.0" -source = "git+https://github.com/DataDog/serverless-components?rev=b1583da00e202985057cf6d48f05e1cac77ab910#b1583da00e202985057cf6d48f05e1cac77ab910" +source = "git+https://github.com/DataDog/serverless-components?rev=c3d8ed4f90591c6958921145d485463860307f78#c3d8ed4f90591c6958921145d485463860307f78" dependencies = [ - "datadog-fips 0.1.0 (git+https://github.com/DataDog/serverless-components?rev=b1583da00e202985057cf6d48f05e1cac77ab910)", + "datadog-fips", "datadog-protos 0.1.0 (git+https://github.com/DataDog/saluki/?rev=c89b58e5784b985819baf11f13f7d35876741222)", "ddsketch-agent 0.1.0 (git+https://github.com/DataDog/saluki/?rev=c89b58e5784b985819baf11f13f7d35876741222)", "derive_more", diff --git a/bottlecap/Cargo.toml b/bottlecap/Cargo.toml index 58201859f..dc5314059 100644 --- a/bottlecap/Cargo.toml +++ b/bottlecap/Cargo.toml @@ -57,9 +57,9 @@ datadog-trace-protobuf = { git = "https://github.com/DataDog/libdatadog", rev = datadog-trace-utils = { git = "https://github.com/DataDog/libdatadog", rev = "d6a2da32c6b92d6865a7e7987c8a1df2203fb1ae" , features = ["compression"] } datadog-trace-normalization = { git = "https://github.com/DataDog/libdatadog", rev = "d6a2da32c6b92d6865a7e7987c8a1df2203fb1ae" } datadog-trace-obfuscation = { git = "https://github.com/DataDog/libdatadog", rev = "d6a2da32c6b92d6865a7e7987c8a1df2203fb1ae" } -dogstatsd = { git = "https://github.com/DataDog/serverless-components", rev = "b1583da00e202985057cf6d48f05e1cac77ab910", default-features = false } -datadog-trace-agent = { git = "https://github.com/DataDog/serverless-components", rev = "3f3347790d4c7f3393589ef7efb7ccf22405a89a" } -datadog-fips = { git = "https://github.com/DataDog/serverless-components", rev = "3f3347790d4c7f3393589ef7efb7ccf22405a89a", default-features = false } +dogstatsd = { git = "https://github.com/DataDog/serverless-components", rev = "c3d8ed4f90591c6958921145d485463860307f78", default-features = false } +datadog-trace-agent = { git = "https://github.com/DataDog/serverless-components", rev = "c3d8ed4f90591c6958921145d485463860307f78" } +datadog-fips = { git = "https://github.com/DataDog/serverless-components", rev = "c3d8ed4f90591c6958921145d485463860307f78", default-features = false } axum = { version = "0.8.4", default-features = false, features = ["default"] } [dev-dependencies] diff --git a/bottlecap/LICENSE-3rdparty.yml b/bottlecap/LICENSE-3rdparty.yml index b54a2092e..14d2792c5 100644 --- a/bottlecap/LICENSE-3rdparty.yml +++ b/bottlecap/LICENSE-3rdparty.yml @@ -4239,214 +4239,6 @@ third_party_libraries: Copyright 2025 Datadog, Inc. - Licensed under the Apache License, Version 2.0 (the "License"); - you may not use this file except in compliance with the License. - You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. -- package_name: datadog-fips - package_version: 0.1.0 - repository: https://github.com/DataDog/serverless-components - license: Apache-2.0 - licenses: - - license: Apache-2.0 - text: |2 - Apache License - Version 2.0, January 2004 - http://www.apache.org/licenses/ - - TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION - - 1. Definitions. - - "License" shall mean the terms and conditions for use, reproduction, - and distribution as defined by Sections 1 through 9 of this document. - - "Licensor" shall mean the copyright owner or entity authorized by - the copyright owner that is granting the License. - - "Legal Entity" shall mean the union of the acting entity and all - other entities that control, are controlled by, or are under common - control with that entity. For the purposes of this definition, - "control" means (i) the power, direct or indirect, to cause the - direction or management of such entity, whether by contract or - otherwise, or (ii) ownership of fifty percent (50%) or more of the - outstanding shares, or (iii) beneficial ownership of such entity. - - "You" (or "Your") shall mean an individual or Legal Entity - exercising permissions granted by this License. - - "Source" form shall mean the preferred form for making modifications, - including but not limited to software source code, documentation - source, and configuration files. - - "Object" form shall mean any form resulting from mechanical - transformation or translation of a Source form, including but - not limited to compiled object code, generated documentation, - and conversions to other media types. - - "Work" shall mean the work of authorship, whether in Source or - Object form, made available under the License, as indicated by a - copyright notice that is included in or attached to the work - (an example is provided in the Appendix below). - - "Derivative Works" shall mean any work, whether in Source or Object - form, that is based on (or derived from) the Work and for which the - editorial revisions, annotations, elaborations, or other modifications - represent, as a whole, an original work of authorship. For the purposes - of this License, Derivative Works shall not include works that remain - separable from, or merely link (or bind by name) to the interfaces of, - the Work and Derivative Works thereof. - - "Contribution" shall mean any work of authorship, including - the original version of the Work and any modifications or additions - to that Work or Derivative Works thereof, that is intentionally - submitted to Licensor for inclusion in the Work by the copyright owner - or by an individual or Legal Entity authorized to submit on behalf of - the copyright owner. For the purposes of this definition, "submitted" - means any form of electronic, verbal, or written communication sent - to the Licensor or its representatives, including but not limited to - communication on electronic mailing lists, source code control systems, - and issue tracking systems that are managed by, or on behalf of, the - Licensor for the purpose of discussing and improving the Work, but - excluding communication that is conspicuously marked or otherwise - designated in writing by the copyright owner as "Not a Contribution." - - "Contributor" shall mean Licensor and any individual or Legal Entity - on behalf of whom a Contribution has been received by Licensor and - subsequently incorporated within the Work. - - 2. Grant of Copyright License. Subject to the terms and conditions of - this License, each Contributor hereby grants to You a perpetual, - worldwide, non-exclusive, no-charge, royalty-free, irrevocable - copyright license to reproduce, prepare Derivative Works of, - publicly display, publicly perform, sublicense, and distribute the - Work and such Derivative Works in Source or Object form. - - 3. Grant of Patent License. Subject to the terms and conditions of - this License, each Contributor hereby grants to You a perpetual, - worldwide, non-exclusive, no-charge, royalty-free, irrevocable - (except as stated in this section) patent license to make, have made, - use, offer to sell, sell, import, and otherwise transfer the Work, - where such license applies only to those patent claims licensable - by such Contributor that are necessarily infringed by their - Contribution(s) alone or by combination of their Contribution(s) - with the Work to which such Contribution(s) was submitted. If You - institute patent litigation against any entity (including a - cross-claim or counterclaim in a lawsuit) alleging that the Work - or a Contribution incorporated within the Work constitutes direct - or contributory patent infringement, then any patent licenses - granted to You under this License for that Work shall terminate - as of the date such litigation is filed. - - 4. Redistribution. You may reproduce and distribute copies of the - Work or Derivative Works thereof in any medium, with or without - modifications, and in Source or Object form, provided that You - meet the following conditions: - - (a) You must give any other recipients of the Work or - Derivative Works a copy of this License; and - - (b) You must cause any modified files to carry prominent notices - stating that You changed the files; and - - (c) You must retain, in the Source form of any Derivative Works - that You distribute, all copyright, patent, trademark, and - attribution notices from the Source form of the Work, - excluding those notices that do not pertain to any part of - the Derivative Works; and - - (d) If the Work includes a "NOTICE" text file as part of its - distribution, then any Derivative Works that You distribute must - include a readable copy of the attribution notices contained - within such NOTICE file, excluding those notices that do not - pertain to any part of the Derivative Works, in at least one - of the following places: within a NOTICE text file distributed - as part of the Derivative Works; within the Source form or - documentation, if provided along with the Derivative Works; or, - within a display generated by the Derivative Works, if and - wherever such third-party notices normally appear. The contents - of the NOTICE file are for informational purposes only and - do not modify the License. You may add Your own attribution - notices within Derivative Works that You distribute, alongside - or as an addendum to the NOTICE text from the Work, provided - that such additional attribution notices cannot be construed - as modifying the License. - - You may add Your own copyright statement to Your modifications and - may provide additional or different license terms and conditions - for use, reproduction, or distribution of Your modifications, or - for any such Derivative Works as a whole, provided Your use, - reproduction, and distribution of the Work otherwise complies with - the conditions stated in this License. - - 5. Submission of Contributions. Unless You explicitly state otherwise, - any Contribution intentionally submitted for inclusion in the Work - by You to the Licensor shall be under the terms and conditions of - this License, without any additional terms or conditions. - Notwithstanding the above, nothing herein shall supersede or modify - the terms of any separate license agreement you may have executed - with Licensor regarding such Contributions. - - 6. Trademarks. This License does not grant permission to use the trade - names, trademarks, service marks, or product names of the Licensor, - except as required for reasonable and customary use in describing the - origin of the Work and reproducing the content of the NOTICE file. - - 7. Disclaimer of Warranty. Unless required by applicable law or - agreed to in writing, Licensor provides the Work (and each - Contributor provides its Contributions) on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or - implied, including, without limitation, any warranties or conditions - of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A - PARTICULAR PURPOSE. You are solely responsible for determining the - appropriateness of using or redistributing the Work and assume any - risks associated with Your exercise of permissions under this License. - - 8. Limitation of Liability. In no event and under no legal theory, - whether in tort (including negligence), contract, or otherwise, - unless required by applicable law (such as deliberate and grossly - negligent acts) or agreed to in writing, shall any Contributor be - liable to You for damages, including any direct, indirect, special, - incidental, or consequential damages of any character arising as a - result of this License or out of the use or inability to use the - Work (including but not limited to damages for loss of goodwill, - work stoppage, computer failure or malfunction, or any and all - other commercial damages or losses), even if such Contributor - has been advised of the possibility of such damages. - - 9. Accepting Warranty or Additional Liability. While redistributing - the Work or Derivative Works thereof, You may choose to offer, - and charge a fee for, acceptance of support, warranty, indemnity, - or other liability obligations and/or rights consistent with this - License. However, in accepting such obligations, You may act only - on Your own behalf and on Your sole responsibility, not on behalf - of any other Contributor, and only if You agree to indemnify, - defend, and hold each Contributor harmless for any liability - incurred by, or claims asserted against, such Contributor by reason - of your accepting any such warranty or additional liability. - - END OF TERMS AND CONDITIONS - - APPENDIX: How to apply the Apache License to your work. - - To apply the Apache License to your work, attach the following - boilerplate notice, with the fields enclosed by brackets "[]" - replaced with your own identifying information. (Don't include - the brackets!) The text should be enclosed in the appropriate - comment syntax for the file format. We also recommend that a - file or class name and description of purpose be included on the - same "printed page" as the copyright notice for easier - identification within third-party archives. - - Copyright [yyyy] [name of copyright owner] - Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at @@ -5071,7 +4863,7 @@ third_party_libraries: same "printed page" as the copyright notice for easier identification within third-party archives. - Copyright [yyyy] [name of copyright owner] + Copyright 2025 Datadog, Inc. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. diff --git a/bottlecap/src/bin/bottlecap/main.rs b/bottlecap/src/bin/bottlecap/main.rs index 054cb92fb..da8c8c456 100644 --- a/bottlecap/src/bin/bottlecap/main.rs +++ b/bottlecap/src/bin/bottlecap/main.rs @@ -87,7 +87,13 @@ use tracing_subscriber::EnvFilter; struct PendingFlushHandles { trace_flush_handles: FuturesOrdered>>, log_flush_handles: FuturesOrdered>>, - metric_flush_handles: FuturesOrdered, Vec)>>, + metric_flush_handles: FuturesOrdered>, +} + +struct MetricsRetryBatch { + flusher_id: usize, + series: Vec, + sketches: Vec, } impl PendingFlushHandles { @@ -103,7 +109,7 @@ impl PendingFlushHandles { &mut self, logs_flusher: &LogsFlusher, trace_flusher: &ServerlessTraceFlusher, - metrics_flusher: &Arc>, + metrics_flushers: &Arc>>, ) -> bool { let mut joinset = tokio::task::JoinSet::new(); let mut flush_error = false; @@ -152,22 +158,22 @@ impl PendingFlushHandles { } while let Some(retries) = self.metric_flush_handles.next().await { - let mf = metrics_flusher.clone(); + let mf = metrics_flushers.clone(); match retries { - Ok((series, sketches)) => { - if !series.is_empty() || !sketches.is_empty() { + Ok(retry_batch) => { + if !retry_batch.series.is_empty() || !retry_batch.sketches.is_empty() { debug!( "redriving {:?} series and {:?} sketch payloads", - series.len(), - sketches.len() + retry_batch.series.len(), + retry_batch.sketches.len() ); - let series_clone = series.clone(); - let sketches_clone = sketches.clone(); joinset.spawn(async move { - let mut locked_flusher = mf.lock().await; - locked_flusher - .flush_with_retries(Some(series_clone), Some(sketches_clone)) - .await; + let mut locked_flushers = mf.lock().await; + if let Some(flusher) = locked_flushers.get_mut(retry_batch.flusher_id) { + flusher + .flush_metrics(retry_batch.series, retry_batch.sketches) + .await; + } }); } } @@ -437,7 +443,7 @@ async fn extension_loop_active( .expect("failed to create aggregator"), )); - let metrics_flusher = Arc::new(TokioMutex::new(start_metrics_flusher( + let metrics_flushers = Arc::new(TokioMutex::new(start_metrics_flushers( resolved_api_key.clone(), &metrics_aggr, config, @@ -520,26 +526,28 @@ async fn extension_loop_active( } } _ = race_flush_interval.tick() => { - let mut locked_metrics = metrics_flusher.lock().await; + let mut locked_metrics = metrics_flushers.lock().await; blocking_flush_all( &logs_flusher, &mut locked_metrics, &*trace_flusher, &*stats_flusher, &mut race_flush_interval, + &metrics_aggr, ) .await; } } } // flush - let mut locked_metrics = metrics_flusher.lock().await; + let mut locked_metrics = metrics_flushers.lock().await; blocking_flush_all( &logs_flusher, &mut locked_metrics, &*trace_flusher, &*stats_flusher, &mut race_flush_interval, + &metrics_aggr, ) .await; let next_response = next_event(client, &r.extension_id).await; @@ -551,7 +559,7 @@ async fn extension_loop_active( let tf = trace_flusher.clone(); // Await any previous flush handles. This last_continuous_flush_error = pending_flush_handles - .await_flush_handles(&logs_flusher.clone(), &tf, &metrics_flusher) + .await_flush_handles(&logs_flusher.clone(), &tf, &metrics_flushers) .await; let val = logs_flusher.clone(); @@ -564,27 +572,41 @@ async fn extension_loop_active( .push_back(tokio::spawn(async move { traces_val.flush(None).await.unwrap_or_default() })); - let cloned_metrics_flusher = metrics_flusher.clone(); - pending_flush_handles - .metric_flush_handles - .push_back(tokio::spawn(async move { - cloned_metrics_flusher - .lock() - .await - .flush() + let (metrics_flushers_copy, series, sketches) = { + let locked_metrics = metrics_flushers.lock().await; + let mut aggregator = metrics_aggr.lock().expect("lock poisoned"); + ( + locked_metrics.clone(), + aggregator.consume_metrics(), + aggregator.consume_distributions(), + ) + }; + for (idx, mut flusher) in metrics_flushers_copy.into_iter().enumerate() { + let series_clone = series.clone(); + let sketches_clone = sketches.clone(); + let handle = tokio::spawn(async move { + let (retry_series, retry_sketches) = flusher + .flush_metrics(series_clone.clone(), sketches_clone.clone()) .await - .unwrap_or_default() - })); + .unwrap_or_default(); + MetricsRetryBatch { + flusher_id: idx, + series: retry_series, + sketches: retry_sketches, + } + }); + pending_flush_handles.metric_flush_handles.push_back(handle); + } race_flush_interval.reset(); } else if current_flush_decision == FlushDecision::Periodic { - // TODO(astuyve): still await the shutdown flush handles - let mut locked_metrics = metrics_flusher.lock().await; + let mut locked_metrics = metrics_flushers.lock().await; blocking_flush_all( &logs_flusher, &mut locked_metrics, &*trace_flusher, &*stats_flusher, &mut race_flush_interval, + &metrics_aggr, ) .await; last_continuous_flush_error = false; @@ -616,13 +638,14 @@ async fn extension_loop_active( handle_event_bus_event(event, invocation_processor.clone(), tags_provider.clone(), trace_processor.clone(), trace_agent_channel.clone()).await; } _ = race_flush_interval.tick() => { - let mut locked_metrics = metrics_flusher.lock().await; + let mut locked_metrics = metrics_flushers.lock().await; blocking_flush_all( &logs_flusher, &mut locked_metrics, &*trace_flusher, &*stats_flusher, &mut race_flush_interval, + &metrics_aggr, ) .await; } @@ -637,7 +660,7 @@ async fn extension_loop_active( // Redrive/block on any failed payloads let tf = trace_flusher.clone(); pending_flush_handles - .await_flush_handles(&logs_flusher.clone(), &tf, &metrics_flusher) + .await_flush_handles(&logs_flusher.clone(), &tf, &metrics_flushers) .await; // The Shutdown event we get during a timeout will // never include a report log @@ -663,13 +686,14 @@ async fn extension_loop_active( telemetry_listener_cancel_token.cancel(); // gotta lock here - let mut locked_metrics = metrics_flusher.lock().await; + let mut locked_metrics = metrics_flushers.lock().await; blocking_flush_all( &logs_flusher, &mut locked_metrics, &*trace_flusher, &*stats_flusher, &mut race_flush_interval, + &metrics_aggr, ) .await; return Ok(()); @@ -679,14 +703,27 @@ async fn extension_loop_active( async fn blocking_flush_all( logs_flusher: &LogsFlusher, - metrics_flusher: &mut MetricsFlusher, + metrics_flushers: &mut [MetricsFlusher], trace_flusher: &impl TraceFlusher, stats_flusher: &impl StatsFlusher, race_flush_interval: &mut tokio::time::Interval, + metrics_aggr: &Arc>, ) { + let (series, sketches) = { + let mut aggregator = metrics_aggr.lock().expect("lock poisoned"); + ( + aggregator.consume_metrics(), + aggregator.consume_distributions(), + ) + }; + let metrics_futures: Vec<_> = metrics_flushers + .iter_mut() + .map(|f| f.flush_metrics(series.clone(), sketches.clone())) + .collect(); + tokio::join!( logs_flusher.flush(None), - metrics_flusher.flush(), + futures::future::join_all(metrics_futures), trace_flusher.flush(None), stats_flusher.flush() ); @@ -842,11 +879,13 @@ fn start_logs_agent( (logs_agent_channel, logs_flusher) } -fn start_metrics_flusher( +fn start_metrics_flushers( resolved_api_key: String, metrics_aggr: &Arc>, config: &Arc, -) -> MetricsFlusher { +) -> Vec { + let mut flushers = Vec::new(); + let metrics_intake_url = if !config.dd_url.is_empty() { let dd_dd_url = DdDdUrl::new(config.dd_url.clone()).expect("can't parse DD_DD_URL"); @@ -871,7 +910,35 @@ fn start_metrics_flusher( timeout: Duration::from_secs(config.flush_timeout), retry_strategy: DsdRetryStrategy::Immediate(3), }; - MetricsFlusher::new(flusher_config) + flushers.push(MetricsFlusher::new(flusher_config)); + + for (endpoint_url, api_keys) in &config.additional_endpoints { + let dd_url = match DdUrl::new(endpoint_url.clone()) { + Ok(url) => url, + Err(err) => { + error!("Invalid additional endpoint: {err}. Falling back to 'https://app.datadoghq.com'"); + DdUrl::new("https://app.datadoghq.com".to_string()) + .expect("additional endpoint fallback URL is invalid") + } + }; + let prefix_override = MetricsIntakeUrlPrefixOverride::maybe_new(Some(dd_url), None); + let metrics_intake_url = MetricsIntakeUrlPrefix::new(None, prefix_override) + .expect("can't parse additional endpoint URL"); + + // Create a flusher for each endpoint URL and API key pair + for api_key in api_keys { + let additional_flusher_config = MetricsFlusherConfig { + api_key: api_key.clone(), + aggregator: metrics_aggr.clone(), + metrics_intake_url_prefix: metrics_intake_url.clone(), + https_proxy: config.https_proxy.clone(), + timeout: Duration::from_secs(config.flush_timeout), + retry_strategy: DsdRetryStrategy::Immediate(3), + }; + flushers.push(MetricsFlusher::new(additional_flusher_config)); + } + } + flushers } fn start_trace_agent( diff --git a/bottlecap/src/config/additional_endpoints.rs b/bottlecap/src/config/additional_endpoints.rs new file mode 100644 index 000000000..281229d9c --- /dev/null +++ b/bottlecap/src/config/additional_endpoints.rs @@ -0,0 +1,118 @@ +use serde::{Deserialize, Deserializer}; +use serde_json::Value; +use std::collections::HashMap; +use tracing::error; + +#[allow(clippy::module_name_repetitions)] +pub fn deserialize_additional_endpoints<'de, D>( + deserializer: D, +) -> Result>, D::Error> +where + D: Deserializer<'de>, +{ + let value = Value::deserialize(deserializer)?; + + match value { + Value::Object(map) => { + // For YAML format (object) in datadog.yaml + let mut result = HashMap::new(); + for (key, value) in map { + match value { + Value::Array(arr) => { + let urls: Vec = arr + .into_iter() + .filter_map(|v| v.as_str().map(String::from)) + .collect(); + result.insert(key, urls); + } + _ => { + error!("Failed to deserialize additional endpoints - Invalid YAML format: expected array for key {}", key); + } + } + } + Ok(result) + } + Value::String(s) if !s.is_empty() => { + // For JSON format (string) in DD_ADDITIONAL_ENDPOINTS + if let Ok(map) = serde_json::from_str(&s) { + Ok(map) + } else { + error!("Failed to deserialize additional endpoints - Invalid JSON format"); + Ok(HashMap::new()) + } + } + _ => Ok(HashMap::new()), + } +} + +#[cfg(test)] +mod tests { + use super::*; + use serde_json::json; + + #[test] + fn test_deserialize_additional_endpoints_yaml() { + // Test YAML format (object) + let input = json!({ + "https://app.datadoghq.com": ["key1", "key2"], + "https://app.datadoghq.eu": ["key3"] + }); + + let result = deserialize_additional_endpoints(input).unwrap(); + + let mut expected = HashMap::new(); + expected.insert( + "https://app.datadoghq.com".to_string(), + vec!["key1".to_string(), "key2".to_string()], + ); + expected.insert( + "https://app.datadoghq.eu".to_string(), + vec!["key3".to_string()], + ); + + assert_eq!(result, expected); + } + + #[test] + fn test_deserialize_additional_endpoints_json() { + // Test JSON string format + let input = json!("{\"https://app.datadoghq.com\":[\"key1\",\"key2\"],\"https://app.datadoghq.eu\":[\"key3\"]}"); + + let result = deserialize_additional_endpoints(input).unwrap(); + + let mut expected = HashMap::new(); + expected.insert( + "https://app.datadoghq.com".to_string(), + vec!["key1".to_string(), "key2".to_string()], + ); + expected.insert( + "https://app.datadoghq.eu".to_string(), + vec!["key3".to_string()], + ); + + assert_eq!(result, expected); + } + + #[test] + fn test_deserialize_additional_endpoints_invalid_or_empty() { + // Test empty YAML + let input = json!({}); + let result = deserialize_additional_endpoints(input).unwrap(); + assert!(result.is_empty()); + + // Test empty JSON + let input = json!(""); + let result = deserialize_additional_endpoints(input).unwrap(); + assert!(result.is_empty()); + + let input = json!({ + "https://app.datadoghq.com": "invalid-yaml" + }); + let result = deserialize_additional_endpoints(input).unwrap(); + assert!(result.is_empty()); + + let input = json!("invalid-json"); + let result = deserialize_additional_endpoints(input).unwrap(); + assert!(result.is_empty()); + } +} diff --git a/bottlecap/src/config/env.rs b/bottlecap/src/config/env.rs index edd702ed1..671cddc90 100644 --- a/bottlecap/src/config/env.rs +++ b/bottlecap/src/config/env.rs @@ -1,3 +1,4 @@ +use crate::config::additional_endpoints::deserialize_additional_endpoints; use serde::{Deserialize, Deserializer}; use std::collections::HashMap; use std::vec; @@ -75,6 +76,8 @@ pub struct Config { // Metrics overrides pub dd_url: String, pub url: String, + #[serde(deserialize_with = "deserialize_additional_endpoints")] + pub additional_endpoints: HashMap>, // OTLP // // - Traces @@ -172,6 +175,7 @@ impl Default for Config { apm_features: vec![], dd_url: String::default(), url: String::default(), + additional_endpoints: HashMap::new(), // OTLP // // - Receiver diff --git a/bottlecap/src/config/mod.rs b/bottlecap/src/config/mod.rs index 541fa83ab..6e784ea20 100644 --- a/bottlecap/src/config/mod.rs +++ b/bottlecap/src/config/mod.rs @@ -1,3 +1,4 @@ +pub mod additional_endpoints; pub mod apm_replace_rule; pub mod aws; pub mod env; @@ -242,6 +243,15 @@ fn merge_config(config: &mut EnvConfig, yaml_config: &YamlConfig) { .otlp_config_traces_span_name_remappings .clone_from(&yaml_otlp_config_traces_span_name_remappings); } + + // Dual Shipping + // + // - Metrics + if config.additional_endpoints.is_empty() { + config + .additional_endpoints + .clone_from(&yaml_config.additional_endpoints); + } } #[inline] diff --git a/bottlecap/src/config/yaml.rs b/bottlecap/src/config/yaml.rs index 22e3aa5fe..e82b322cb 100644 --- a/bottlecap/src/config/yaml.rs +++ b/bottlecap/src/config/yaml.rs @@ -1,5 +1,6 @@ use std::collections::HashMap; +use crate::config::additional_endpoints::deserialize_additional_endpoints; use crate::config::{deserialize_apm_replace_rules, deserialize_processing_rules, ProcessingRule}; use datadog_trace_obfuscation::replacer::ReplaceRule; use serde::Deserialize; @@ -17,6 +18,8 @@ pub struct Config { pub apm_config: ApmConfig, pub proxy: ProxyConfig, pub otlp_config: Option, + #[serde(deserialize_with = "deserialize_additional_endpoints")] + pub additional_endpoints: HashMap>, } impl Config { @@ -221,6 +224,7 @@ pub struct OtlpTracesConfig { #[cfg(test)] mod tests { + use std::collections::HashMap; use std::path::Path; use crate::config::get_config; @@ -250,4 +254,35 @@ mod tests { Ok(()) }); } + + #[test] + fn test_parse_additional_endpoints_from_yaml() { + figment::Jail::expect_with(|jail| { + jail.clear_env(); + jail.create_file( + "datadog.yaml", + r#" +additional_endpoints: + "https://app.datadoghq.com": + - apikey2 + - apikey3 + "https://app.datadoghq.eu": + - apikey4 +"#, + )?; + + let config = get_config(Path::new("")).expect("should parse config"); + let mut expected = HashMap::new(); + expected.insert( + "https://app.datadoghq.com".to_string(), + vec!["apikey2".to_string(), "apikey3".to_string()], + ); + expected.insert( + "https://app.datadoghq.eu".to_string(), + vec!["apikey4".to_string()], + ); + assert_eq!(config.additional_endpoints, expected); + Ok(()) + }); + } }