diff --git a/.codecov.yml b/.codecov.yml index 71258fc84b..9157e8eca1 100644 --- a/.codecov.yml +++ b/.codecov.yml @@ -39,10 +39,6 @@ component_management: name: ddtelemetry-ffi # this is a display name, and can be changed freely paths: - ddtelemetry-ffi - - component_id: dogstatsd # this is an identifier that should not be changed - name: dogstatsd # this is a display name, and can be changed freely - paths: - - dogstatsd - component_id: dogstatsd-client # this is an identifier that should not be changed name: dogstatsd-client # this is a display name, and can be changed freely paths: diff --git a/.github/CODEOWNERS b/.github/CODEOWNERS index ec610f34a6..3dc9293fda 100644 --- a/.github/CODEOWNERS +++ b/.github/CODEOWNERS @@ -24,4 +24,3 @@ sidecar @Datadog/libdatadog-php @Datadog/libdatadog-apm sidecar-ffi @Datadog/libdatadog-php @Datadog/libdatadog-apm data-pipeline*/ @Datadog/libdatadog-apm ddsketch @Datadog/libdatadog-apm @Datadog/libdatadog-telemetry -dogstatsd/ @Datadog/serverless diff --git a/Cargo.lock b/Cargo.lock index 3e2d46c6b3..8476921278 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2073,15 +2073,14 @@ dependencies = [ [[package]] name = "dogstatsd" -version = "16.0.3" +version = "0.1.0" +source = "git+https://github.com/DataDog/serverless-components/?rev=4dfe72ab1850680f41dd79d30a937eb68e7ba6da#4dfe72ab1850680f41dd79d30a937eb68e7ba6da" dependencies = [ "datadog-protos", "ddsketch-agent", "derive_more", "fnv", "hashbrown 0.14.5", - "mockito", - "proptest", "protobuf", "regex", "reqwest", @@ -2091,7 +2090,6 @@ dependencies = [ "tokio", "tokio-util", "tracing", - "tracing-test", "ustr", "zstd", ] @@ -3508,29 +3506,6 @@ version = "1.12.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c9be0862c1b3f26a88803c4a49de6889c10e608b3ee9344e6ef5b45fb37ad3d1" -[[package]] -name = "mockito" -version = "1.5.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "09b34bd91b9e5c5b06338d392463e1318d683cf82ec3d3af4014609be6e2108d" -dependencies = [ - "assert-json-diff", - "bytes", - "futures-util", - "http 1.1.0", - "http-body 1.0.1", - "http-body-util", - "hyper 1.6.0", - "hyper-util", - "log", - "rand 0.8.5", - "regex", - "serde_json", - "serde_urlencoded", - "similar", - "tokio", -] - [[package]] name = "msvc-demangler" version = "0.10.1" @@ -4123,8 +4098,6 @@ name = "proptest" version = "1.5.0" source = "git+https://github.com/bantonsson/proptest.git?branch=ban/avoid-libm-in-std#9f623fbab7a1a4da487551128c2bffeee2ed6b87" dependencies = [ - "bit-set", - "bit-vec", "bitflags 2.6.0", "lazy_static", "num-traits", @@ -4132,8 +4105,6 @@ dependencies = [ "rand_chacha 0.3.1", "rand_xorshift", "regex-syntax 0.8.5", - "rusty-fork", - "tempfile", "unarray", ] @@ -4376,12 +4347,6 @@ version = "3.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7dc55d7dec32ecaf61e0bd90b3d2392d721a28b95cfd23c3e176eccefbeab2f2" -[[package]] -name = "quick-error" -version = "1.2.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a1d01941d82fa2ab50be1e79e6714289dd7cde78eba4c074bc5a4374f650dfe0" - [[package]] name = "quinn" version = "0.11.5" @@ -4845,18 +4810,6 @@ version = "1.0.18" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0e819f2bc632f285be6d7cd36e25940d45b2391dd6d9b939e79de557f7014248" -[[package]] -name = "rusty-fork" -version = "0.3.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cb3dcc6e454c328bb824492db107ab7c0ae8fcffe4ad210136ef014458c1bc4f" -dependencies = [ - "fnv", - "quick-error", - "tempfile", - "wait-timeout", -] - [[package]] name = "ruzstd" version = "0.3.1" @@ -6102,27 +6055,6 @@ dependencies = [ "tracing-log", ] -[[package]] -name = "tracing-test" -version = "0.2.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "557b891436fe0d5e0e363427fc7f217abf9ccd510d5136549847bdcbcd011d68" -dependencies = [ - "tracing-core", - "tracing-subscriber", - "tracing-test-macro", -] - -[[package]] -name = "tracing-test-macro" -version = "0.2.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "04659ddb06c87d233c566112c1c9c5b9e98256d9af50ec3bc9c8327f873a7568" -dependencies = [ - "quote", - "syn 2.0.87", -] - [[package]] name = "try-lock" version = "0.2.5" @@ -6284,15 +6216,6 @@ version = "0.9.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0b928f33d975fc6ad9f86c8f283853ad26bdd5b10b7f1542aa2fa15e2289105a" -[[package]] -name = "wait-timeout" -version = "0.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9f200f5b12eb75f8c1ed65abd4b2db8a6e1b138a20de009dacee265a2498f3f6" -dependencies = [ - "libc", -] - [[package]] name = "walkdir" version = "2.5.0" diff --git a/Cargo.toml b/Cargo.toml index 5d4dd30d63..12601400e2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -14,7 +14,6 @@ members = [ "ddcommon-ffi", "ddtelemetry", "ddtelemetry-ffi", - "dogstatsd", "tools", "ipc", "ipc/macros", diff --git a/LICENSE-3rdparty.yml b/LICENSE-3rdparty.yml index a99f02ba9d..b9031f3c9c 100644 --- a/LICENSE-3rdparty.yml +++ b/LICENSE-3rdparty.yml @@ -1,4 +1,4 @@ -root_name: datadog-alloc, builder, build_common, tools, datadog-crashtracker, ddcommon, ddtelemetry, datadog-ddsketch, datadog-crashtracker-ffi, ddcommon-ffi, datadog-profiling, datadog-profiling-ffi, data-pipeline-ffi, data-pipeline, datadog-trace-protobuf, datadog-trace-utils, datadog-trace-normalization, tinybytes, dogstatsd-client, datadog-library-config-ffi, datadog-library-config, ddtelemetry-ffi, symbolizer-ffi, datadog-profiling-replayer, dogstatsd, datadog-ipc, datadog-ipc-macros, tarpc, tarpc-plugins, spawn_worker, cc_utils, datadog-live-debugger, datadog-live-debugger-ffi, datadog-remote-config, datadog-dynamic-configuration, datadog-sidecar, datadog-sidecar-macros, datadog-sidecar-ffi, sidecar_mockgen, datadog-trace-obfuscation, test_spawn_from_lib, datadog-serverless-trace-mini-agent, datadog-trace-mini-agent +root_name: datadog-alloc, builder, build_common, tools, datadog-crashtracker, ddcommon, ddtelemetry, datadog-ddsketch, datadog-crashtracker-ffi, ddcommon-ffi, datadog-profiling, datadog-profiling-ffi, data-pipeline-ffi, data-pipeline, datadog-trace-protobuf, datadog-trace-utils, datadog-trace-normalization, tinybytes, dogstatsd-client, datadog-library-config-ffi, datadog-library-config, ddtelemetry-ffi, symbolizer-ffi, datadog-profiling-replayer, datadog-ipc, datadog-ipc-macros, tarpc, tarpc-plugins, spawn_worker, cc_utils, datadog-live-debugger, datadog-live-debugger-ffi, datadog-remote-config, datadog-dynamic-configuration, datadog-sidecar, datadog-sidecar-macros, datadog-sidecar-ffi, sidecar_mockgen, datadog-trace-obfuscation, test_spawn_from_lib, datadog-serverless-trace-mini-agent, datadog-trace-mini-agent third_party_libraries: - package_name: addr2line package_version: 0.24.2 @@ -10136,6 +10136,214 @@ third_party_libraries: of your accepting any such warranty or additional liability. END OF TERMS AND CONDITIONS +- package_name: dogstatsd + package_version: 0.1.0 + repository: '' + license: Apache-2.0 + licenses: + - license: Apache-2.0 + text: |2 + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright [yyyy] [name of copyright owner] + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. - package_name: dyn-clone package_version: 1.0.17 repository: https://github.com/dtolnay/dyn-clone diff --git a/dogstatsd/Cargo.toml b/dogstatsd/Cargo.toml deleted file mode 100644 index 4e321887cc..0000000000 --- a/dogstatsd/Cargo.toml +++ /dev/null @@ -1,32 +0,0 @@ -[package] -name = "dogstatsd" -rust-version.workspace = true -edition.workspace = true -version.workspace = true -license.workspace = true - -[lib] -bench = false - -[dependencies] -datadog-protos = { version = "0.1.0", default-features = false, git = "https://github.com/DataDog/saluki/", rev = "c89b58e5784b985819baf11f13f7d35876741222" } -ddsketch-agent = { version = "0.1.0", default-features = false, git = "https://github.com/DataDog/saluki/", rev = "c89b58e5784b985819baf11f13f7d35876741222" } -derive_more = { version = "1.0.0", features = ["display", "into"] } -hashbrown = { version = "0.14.3", default-features = false, features = ["inline-more"] } -protobuf = { version = "3.5.0", default-features = false } -ustr = { version = "1.0.0", default-features = false } -fnv = { version = "1.0.7", default-features = false } -reqwest = { version = "0.12.4", features = ["json", "http2", "rustls-tls"], default-features = false } -serde = { version = "1.0.197", default-features = false, features = ["derive"] } -serde_json = { version = "1.0.116", default-features = false, features = ["alloc"] } -thiserror = { version = "1.0.58", default-features = false } -tokio = { version = "1.37.0", default-features = false, features = ["macros", "rt-multi-thread"] } -tokio-util = { version = "0.7.11", default-features = false } -tracing = { version = "0.1.40", default-features = false } -regex = { version = "1.10.6", default-features = false } -zstd = { version = "0.13.3", default-features = false } - -[dev-dependencies] -mockito = { version = "1.5.0", default-features = false } -proptest = "1.4.0" -tracing-test = { version = "0.2.5", default-features = false } diff --git a/dogstatsd/README.md b/dogstatsd/README.md deleted file mode 100644 index fa5deb7e93..0000000000 --- a/dogstatsd/README.md +++ /dev/null @@ -1,14 +0,0 @@ -# DogStatsD - -Provides a DogStatsD implementation which uses [Saluki](https://github.com/DataDog/saluki) for distribution metrics. - -## Status -This project is in beta and possible frequent changes should be expected. It's primary purpose is for Serverless to send metrics from AWS Lambda Functions, Azure Functions, and Azure Spring Apps. It is still considered unstable for general purposes. - -- No UDS support -- Uses `ustr`, so prone to memory leaks -- Arbitrary constraints in https://github.com/DataDog/libdatadog/blob/main/dogstatsd/src/constants.rs - -## Additional Notes - -Upstreamed from [Bottlecap](https://github.com/DataDog/datadog-lambda-extension/tree/main/bottlecap) diff --git a/dogstatsd/src/aggregator.rs b/dogstatsd/src/aggregator.rs deleted file mode 100644 index f31a3b0fc3..0000000000 --- a/dogstatsd/src/aggregator.rs +++ /dev/null @@ -1,755 +0,0 @@ -// Copyright 2023-Present Datadog, Inc. https://www.datadoghq.com/ -// SPDX-License-Identifier: Apache-2.0 - -//! The aggregation of metrics. - -use crate::constants; -use crate::datadog::{self, Metric as MetricToShip, Series}; -use crate::errors; -use crate::metric::{self, Metric, MetricValue, SortedTags}; - -use datadog_protos::metrics::{Dogsketch, Sketch, SketchPayload}; -use ddsketch_agent::DDSketch; -use hashbrown::hash_table; -use protobuf::Message; -use tracing::{error, warn}; -use ustr::Ustr; - -impl MetricValue { - fn aggregate(&mut self, metric: Metric) { - // safe because we know there's at least one value when we parse - // TODO aggregating different types should return error - match self { - MetricValue::Count(count) => *count += metric.value.get_value().unwrap_or_default(), - MetricValue::Gauge(gauge) => *gauge = metric.value.get_value().unwrap_or_default(), - MetricValue::Distribution(distribution) => { - if let Some(value) = metric.value.get_sketch() { - distribution.merge(value); - } - } - } - } - - pub fn get_value(&self) -> Option { - match self { - MetricValue::Count(count) => Some(*count), - MetricValue::Gauge(gauge) => Some(*gauge), - MetricValue::Distribution(_) => None, - } - } - - pub fn get_sketch(&self) -> Option<&DDSketch> { - match self { - MetricValue::Distribution(distribution) => Some(distribution), - _ => None, - } - } -} - -#[derive(Clone)] -// NOTE by construction we know that intervals and contexts do not explore the -// full space of usize but the type system limits how we can express this today. -pub struct Aggregator { - tags: SortedTags, - map: hash_table::HashTable, - max_batch_entries_single_metric: usize, - max_batch_bytes_single_metric: u64, - max_batch_entries_sketch_metric: usize, - max_batch_bytes_sketch_metric: u64, - max_context: usize, -} - -impl Aggregator { - /// Create a new instance of `Aggregator` - /// - /// # Errors - /// - /// Will fail at runtime if the type `INTERVALS` and `CONTEXTS` exceed their - /// counterparts in `constants`. This would be better as a compile-time - /// issue, although leaving this open allows for runtime configuration. - #[allow(clippy::cast_precision_loss)] - pub fn new(tags: SortedTags, max_context: usize) -> Result { - if max_context > constants::MAX_CONTEXTS { - return Err(errors::Creation::Contexts); - } - Ok(Self { - tags, - map: hash_table::HashTable::new(), - max_batch_entries_single_metric: constants::MAX_ENTRIES_SINGLE_METRIC, - max_batch_bytes_single_metric: constants::MAX_SIZE_BYTES_SINGLE_METRIC, - max_batch_entries_sketch_metric: constants::MAX_ENTRIES_SKETCH_METRIC, - max_batch_bytes_sketch_metric: constants::MAX_SIZE_SKETCH_METRIC, - max_context, - }) - } - - /// Insert a `Metric` into the `Aggregator` at the current interval - /// - /// # Errors - /// - /// Function will return overflow error if more than - /// `min(constants::MAX_CONTEXTS, CONTEXTS)` is exceeded. - pub fn insert(&mut self, metric: Metric) -> Result<(), errors::Insert> { - let id = metric::id(metric.name, &metric.tags, metric.timestamp); - let len = self.map.len(); - - match self.map.entry( - id, - |m| m.id == id, - |m| metric::id(m.name, &m.tags, m.timestamp), - ) { - hash_table::Entry::Vacant(entry) => { - if len >= self.max_context { - return Err(errors::Insert::Overflow); - } - entry.insert(metric); - } - hash_table::Entry::Occupied(mut entry) => { - entry.get_mut().value.aggregate(metric); - } - } - Ok(()) - } - - pub fn clear(&mut self) { - self.map.clear(); - } - - #[must_use] - pub fn distributions_to_protobuf(&self) -> SketchPayload { - let mut sketch_payload = SketchPayload::new(); - - sketch_payload.sketches = self - .map - .iter() - .filter_map(|entry| match entry.value { - MetricValue::Distribution(_) => build_sketch(entry, self.tags.clone()), - _ => None, - }) - .collect(); - sketch_payload - } - - #[must_use] - pub fn consume_distributions(&mut self) -> Vec { - let mut batched_payloads = Vec::new(); - let mut sketch_payload = SketchPayload::new(); - let mut this_batch_size = 0u64; - for sketch in self - .map - .extract_if(|entry| { - if let MetricValue::Distribution(_) = entry.value { - return true; - } - false - }) - .filter_map(|entry| build_sketch(&entry, self.tags.clone())) - { - let next_chunk_size = sketch.compute_size(); - - if (sketch_payload.sketches.len() >= self.max_batch_entries_sketch_metric) - || (this_batch_size + next_chunk_size >= self.max_batch_bytes_sketch_metric) - { - if this_batch_size == 0 { - warn!("Only one distribution exceeds max batch size, adding it anyway: {:?} with {}", sketch.metric, next_chunk_size); - } else { - batched_payloads.push(sketch_payload); - sketch_payload = SketchPayload::new(); - this_batch_size = 0u64; - } - } - this_batch_size += next_chunk_size; - sketch_payload.sketches.push(sketch); - } - if !sketch_payload.sketches.is_empty() { - batched_payloads.push(sketch_payload); - } - batched_payloads - } - - #[must_use] - pub fn to_series(&self) -> Series { - let mut series_payload = Series { - series: Vec::with_capacity(1_024), - }; - - self.map - .iter() - .filter_map(|entry| match entry.value { - MetricValue::Distribution(_) => None, - _ => build_metric(entry, self.tags.clone()), - }) - .for_each(|metric| series_payload.series.push(metric)); - series_payload - } - - #[must_use] - pub fn consume_metrics(&mut self) -> Vec { - let mut batched_payloads = Vec::new(); - let mut series_payload = Series { - series: Vec::with_capacity(1_024), - }; - let mut this_batch_size = 0u64; - for metric in self - .map - .extract_if(|entry| { - if let MetricValue::Distribution(_) = entry.value { - return false; - } - true - }) - .filter_map(|entry| build_metric(&entry, self.tags.clone())) - { - // TODO serialization is made twice for each point. If we return a Vec we can avoid - // that - let serialized_metric_size = match serde_json::to_vec(&metric) { - Ok(serialized_metric) => serialized_metric.len() as u64, - Err(e) => { - error!("failed to serialize metric: {:?}", e); - 0u64 - } - }; - - if serialized_metric_size > 0 { - if (series_payload.series.len() >= self.max_batch_entries_single_metric) - || (this_batch_size + serialized_metric_size - >= self.max_batch_bytes_single_metric) - { - if this_batch_size == 0 { - warn!("Only one metric exceeds max batch size, adding it anyway: {:?} with {}", metric.metric, serialized_metric_size); - } else { - batched_payloads.push(series_payload); - series_payload = Series { - series: Vec::with_capacity(1_024), - }; - this_batch_size = 0u64; - } - } - series_payload.series.push(metric); - this_batch_size += serialized_metric_size; - } - } - - if !series_payload.series.is_empty() { - batched_payloads.push(series_payload); - } - batched_payloads - } - - pub fn get_entry_by_id( - &self, - name: Ustr, - tags: &Option, - timestamp: i64, - ) -> Option<&Metric> { - let id = metric::id(name, tags, timestamp); - self.map.find(id, |m| m.id == id) - } -} - -fn build_sketch(entry: &Metric, mut base_tag_vec: SortedTags) -> Option { - let sketch = entry.value.get_sketch()?; - let mut dogsketch = Dogsketch::default(); - sketch.merge_to_dogsketch(&mut dogsketch); - // TODO(Astuyve) allow users to specify timestamp - dogsketch.set_ts(entry.timestamp); - let mut sketch = Sketch::default(); - sketch.set_dogsketches(vec![dogsketch]); - let name = entry.name.to_string(); - sketch.set_metric(name.clone().into()); - if let Some(tags) = entry.tags.clone() { - base_tag_vec.extend(&tags); - } - sketch.set_tags(base_tag_vec.to_chars()); - Some(sketch) -} - -fn build_metric(entry: &Metric, mut base_tag_vec: SortedTags) -> Option { - let resources; - if let Some(tags) = entry.tags.clone() { - resources = tags.to_resources(); - } else { - resources = Vec::new(); - } - let kind = match entry.value { - MetricValue::Count(_) => datadog::DdMetricKind::Count, - MetricValue::Gauge(_) => datadog::DdMetricKind::Gauge, - MetricValue::Distribution(_) => unreachable!(), - }; - let point = datadog::Point { - value: entry.value.get_value()?, - // TODO(astuyve) allow user to specify timestamp - timestamp: entry.timestamp as u64, - }; - - if let Some(tags) = entry.tags.clone() { - base_tag_vec.extend(&tags); - } - - Some(MetricToShip { - metric: entry.name.as_str(), - resources, - kind, - points: [point; 1], - tags: base_tag_vec.to_strings(), - }) -} - -#[cfg(test)] -#[allow(clippy::unwrap_used)] -pub mod tests { - use crate::aggregator::Aggregator; - use crate::metric; - use crate::metric::{parse, SortedTags, EMPTY_TAGS}; - use datadog_protos::metrics::SketchPayload; - use hashbrown::hash_table; - use protobuf::Message; - use std::sync::Mutex; - - const PRECISION: f64 = 0.000_000_01; - - const SINGLE_METRIC_SIZE: usize = 193; // taken from the test, size of a serialized metric with one tag and 1 digit counter value - const SINGLE_DISTRIBUTION_SIZE: u64 = 140; - const DEFAULT_TAGS: &str = - "dd_extension_version:63-next,architecture:x86_64,_dd.compute_stats:1"; - - pub fn assert_value( - aggregator_mutex: &Mutex, - metric_id: &str, - value: f64, - tags: &str, - timestamp: i64, - ) { - let aggregator = aggregator_mutex.lock().unwrap(); - if let Some(e) = aggregator.get_entry_by_id( - metric_id.into(), - &Some(SortedTags::parse(tags).unwrap()), - timestamp, - ) { - let metric = e.value.get_value().unwrap(); - assert!((metric - value).abs() < PRECISION); - } else { - panic!("{}", format!("{metric_id} not found")); - } - } - - pub fn assert_sketch( - aggregator_mutex: &Mutex, - metric_id: &str, - value: f64, - timestamp: i64, - ) { - let aggregator = aggregator_mutex.lock().unwrap(); - if let Some(e) = aggregator.get_entry_by_id(metric_id.into(), &None, timestamp) { - let metric = e.value.get_sketch().unwrap(); - assert!((metric.max().unwrap() - value).abs() < PRECISION); - assert!((metric.min().unwrap() - value).abs() < PRECISION); - assert!((metric.sum().unwrap() - value).abs() < PRECISION); - assert!((metric.avg().unwrap() - value).abs() < PRECISION); - } else { - panic!("{}", format!("{metric_id} not found")); - } - } - - #[test] - #[cfg_attr(miri, ignore)] - fn insertion() { - let mut aggregator = Aggregator::new(EMPTY_TAGS, 2).unwrap(); - - let metric1 = parse("test:1|c|#k:v").expect("metric parse failed"); - let metric2 = parse("foo:1|c|#k:v").expect("metric parse failed"); - - assert!(aggregator.insert(metric1).is_ok()); - assert!(aggregator.insert(metric2).is_ok()); - - // Both unique contexts get one slot. - assert_eq!(aggregator.map.len(), 2); - } - - #[test] - #[cfg_attr(miri, ignore)] - fn distribution_insertion() { - let mut aggregator = Aggregator::new(EMPTY_TAGS, 2).unwrap(); - - let metric1 = parse("test:1|d|#k:v").expect("metric parse failed"); - let metric2 = parse("foo:1|d|#k:v").expect("metric parse failed"); - - assert!(aggregator.insert(metric1).is_ok()); - assert!(aggregator.insert(metric2).is_ok()); - - // Both unique contexts get one slot. - assert_eq!(aggregator.map.len(), 2); - } - - #[test] - #[cfg_attr(miri, ignore)] - fn overflow() { - let mut aggregator = Aggregator::new(EMPTY_TAGS, 2).unwrap(); - let mut now = std::time::UNIX_EPOCH - .elapsed() - .expect("unable to poll clock, unrecoverable") - .as_secs() - .try_into() - .unwrap_or_default(); - now = (now / 10) * 10; - let metric1 = parse("test:1|c|#k:v").expect("metric parse failed"); - let metric2 = parse("foo:1|c|#k:v").expect("metric parse failed"); - let metric3 = parse("bar:1|c|#k:v").expect("metric parse failed"); - - let id1 = metric::id(metric1.name, &metric1.tags, now); - let id2 = metric::id(metric2.name, &metric2.tags, now); - let id3 = metric::id(metric3.name, &metric3.tags, now); - - assert_ne!(id1, id2); - assert_ne!(id1, id3); - assert_ne!(id2, id3); - - assert!(aggregator.insert(metric1).is_ok()); - assert_eq!(aggregator.map.len(), 1); - - assert!(aggregator.insert(metric2.clone()).is_ok()); - assert!(aggregator.insert(metric2.clone()).is_ok()); - assert!(aggregator.insert(metric2).is_ok()); - assert_eq!(aggregator.map.len(), 2); - - assert!(aggregator.insert(metric3).is_err()); - assert_eq!(aggregator.map.len(), 2); - } - - #[test] - #[allow(clippy::float_cmp)] - #[cfg_attr(miri, ignore)] - fn clear() { - let mut aggregator = Aggregator::new(EMPTY_TAGS, 2).unwrap(); - let mut now = 1656581409; - now = (now / 10) * 10; - let metric1 = parse("test:3|c|#k1:v1|T1656581409").expect("metric parse failed"); - let metric2 = parse("foo:5|c|#k2:v2|T1656581409").expect("metric parse failed"); - - assert!(aggregator.insert(metric1).is_ok()); - assert!(aggregator.insert(metric2).is_ok()); - - assert_eq!(aggregator.map.len(), 2); - if let Some(v) = aggregator.get_entry_by_id( - "foo".into(), - &Some(SortedTags::parse("k2:v2").unwrap()), - now, - ) { - assert_eq!(v.value.get_value().unwrap(), 5f64); - } else { - panic!("failed to get value by id"); - } - - if let Some(v) = aggregator.get_entry_by_id( - "test".into(), - &Some(SortedTags::parse("k1:v1").unwrap()), - now, - ) { - assert_eq!(v.value.get_value().unwrap(), 3f64); - } else { - panic!("failed to get value by id"); - } - - aggregator.clear(); - assert_eq!(aggregator.map.len(), 0); - } - - #[test] - #[cfg_attr(miri, ignore)] - fn to_series() { - let mut aggregator = Aggregator::new(EMPTY_TAGS, 2).unwrap(); - - let metric1 = parse("test:1|c|#k1:v1,k2:v2").expect("metric parse failed"); - let metric2 = parse("foo:1|c|#k:v").expect("metric parse failed"); - let metric3 = parse("bar:1|c|#k:v").expect("metric parse failed"); - - assert!(aggregator.insert(metric1).is_ok()); - assert!(aggregator.insert(metric2).is_ok()); - - assert_eq!(aggregator.map.len(), 2); - assert_eq!(aggregator.to_series().len(), 2); - // to_series should not mutate the state - assert_eq!(aggregator.map.len(), 2); - assert_eq!(aggregator.to_series().len(), 2); - assert_eq!(aggregator.map.len(), 2); - - assert!(aggregator.insert(metric3).is_err()); - assert_eq!(aggregator.to_series().len(), 2); - } - - #[test] - #[cfg_attr(miri, ignore)] - fn distributions_to_protobuf() { - let mut aggregator = Aggregator::new(EMPTY_TAGS, 2).unwrap(); - - let metric1 = parse("test:1|d|#k:v").expect("metric parse failed"); - let metric2 = parse("foo:1|d|#k:v").expect("metric parse failed"); - - assert!(aggregator.insert(metric1).is_ok()); - assert!(aggregator.insert(metric2).is_ok()); - - assert_eq!(aggregator.map.len(), 2); - assert_eq!(aggregator.distributions_to_protobuf().sketches().len(), 2); - assert_eq!(aggregator.map.len(), 2); - assert_eq!(aggregator.distributions_to_protobuf().sketches().len(), 2); - assert_eq!(aggregator.map.len(), 2); - } - - #[test] - #[cfg_attr(miri, ignore)] - fn consume_distributions_ignore_single_metrics() { - let mut aggregator = Aggregator::new(EMPTY_TAGS, 1_000).unwrap(); - assert_eq!(aggregator.distributions_to_protobuf().sketches.len(), 0); - - assert!(aggregator - .insert(parse("test1:1|d|#k:v".to_string().as_str()).expect("metric parse failed")) - .is_ok()); - assert_eq!(aggregator.distributions_to_protobuf().sketches.len(), 1); - - assert!(aggregator - .insert(parse("foo:1|c|#k:v").expect("metric parse failed")) - .is_ok()); - assert_eq!(aggregator.distributions_to_protobuf().sketches.len(), 1); - } - - #[test] - #[cfg_attr(miri, ignore)] - fn consume_distributions_batch_entries() { - let max_batch = 5; - let tot = 12; - let mut aggregator = Aggregator { - tags: EMPTY_TAGS, - map: hash_table::HashTable::new(), - max_batch_entries_single_metric: 1_000, - max_batch_bytes_single_metric: 1_000, - max_batch_entries_sketch_metric: max_batch, - max_batch_bytes_sketch_metric: 1_500, - max_context: 1_000, - }; - - add_metrics(tot, &mut aggregator, "d".to_string()); - let batched = aggregator.consume_distributions(); - assert_eq!(aggregator.consume_distributions().len(), 0); - - assert_eq!(batched.len(), 3); - assert_eq!(batched.first().unwrap().sketches.len(), max_batch); - assert_eq!(batched.get(1).unwrap().sketches.len(), max_batch); - assert_eq!(batched.get(2).unwrap().sketches.len(), tot - max_batch * 2); - } - - #[test] - #[cfg_attr(miri, ignore)] - fn consume_distributions_batch_bytes() { - let expected_distribution_per_batch = 2; - let total_number_of_distributions = 5; - let max_bytes = SINGLE_DISTRIBUTION_SIZE * expected_distribution_per_batch as u64; - let mut aggregator = Aggregator { - tags: to_sorted_tags(), - map: hash_table::HashTable::new(), - max_batch_entries_single_metric: 1_000, - max_batch_bytes_single_metric: 1_000, - max_batch_entries_sketch_metric: 1_000, - max_batch_bytes_sketch_metric: max_bytes, - max_context: 1_000, - }; - - add_metrics( - total_number_of_distributions, - &mut aggregator, - "d".to_string(), - ); - let batched = aggregator.consume_distributions(); - - assert_eq!( - batched.len(), - total_number_of_distributions / expected_distribution_per_batch + 1 - ); - assert_eq!( - batched.first().unwrap().compute_size(), - SINGLE_DISTRIBUTION_SIZE * expected_distribution_per_batch as u64 - ); - assert_eq!( - batched.get(1).unwrap().compute_size(), - SINGLE_DISTRIBUTION_SIZE * expected_distribution_per_batch as u64 - ); - assert_eq!( - batched.get(2).unwrap().compute_size(), - SINGLE_DISTRIBUTION_SIZE - ); - } - - fn to_sorted_tags() -> SortedTags { - SortedTags::parse(DEFAULT_TAGS).unwrap() - } - - #[test] - #[cfg_attr(miri, ignore)] - fn consume_distribution_one_element_bigger_than_max_size() { - let max_bytes = 1; - let tot = 5; - let mut aggregator = Aggregator { - tags: to_sorted_tags(), - map: hash_table::HashTable::new(), - max_batch_entries_single_metric: 1_000, - max_batch_bytes_single_metric: 1_000, - max_batch_entries_sketch_metric: 1_000, - max_batch_bytes_sketch_metric: max_bytes, - max_context: 1_000, - }; - - add_metrics(tot, &mut aggregator, "d".to_string()); - let batched = aggregator.consume_distributions(); - - assert_eq!(batched.len(), tot); - for a_batch in batched { - assert_eq!(a_batch.compute_size(), SINGLE_DISTRIBUTION_SIZE); - } - } - - fn add_metrics(tot: usize, aggregator: &mut Aggregator, counter_or_distro: String) { - for i in 1..=tot { - assert!(aggregator - .insert( - parse(format!("test{i}:{i}|{counter_or_distro}|#k:v").as_str()) - .expect("metric parse failed") - ) - .is_ok()); - } - } - - #[test] - #[cfg_attr(miri, ignore)] - fn consume_series_ignore_distribution() { - let mut aggregator = Aggregator::new(EMPTY_TAGS, 1_000).unwrap(); - - assert_eq!(aggregator.consume_metrics().len(), 0); - - assert!(aggregator - .insert(parse("test1:1|c|#k:v".to_string().as_str()).expect("metric parse failed")) - .is_ok()); - assert_eq!(aggregator.consume_distributions().len(), 0); - assert_eq!(aggregator.consume_metrics().len(), 1); - assert_eq!(aggregator.consume_metrics().len(), 0); - - assert!(aggregator - .insert(parse("test1:1|c|#k:v".to_string().as_str()).expect("metric parse failed")) - .is_ok()); - assert!(aggregator - .insert(parse("foo:1|d|#k:v").expect("metric parse failed")) - .is_ok()); - assert_eq!(aggregator.consume_metrics().len(), 1); - assert_eq!(aggregator.consume_distributions().len(), 1); - assert_eq!(aggregator.consume_distributions().len(), 0); - } - - #[test] - #[cfg_attr(miri, ignore)] - fn consume_series_batch_entries() { - let max_batch = 5; - let tot = 13; - let mut aggregator = Aggregator { - tags: EMPTY_TAGS, - map: hash_table::HashTable::new(), - max_batch_entries_single_metric: max_batch, - max_batch_bytes_single_metric: 10_000, - max_batch_entries_sketch_metric: 1_000, - max_batch_bytes_sketch_metric: 1_500, - max_context: 1_000, - }; - - add_metrics(tot, &mut aggregator, "c".to_string()); - - let batched = aggregator.consume_metrics(); - assert_eq!(batched.len(), 3); - assert_eq!(batched.first().unwrap().series.len(), max_batch); - assert_eq!(batched.get(1).unwrap().series.len(), max_batch); - assert_eq!(batched.get(2).unwrap().series.len(), tot - max_batch * 2); - - assert_eq!(aggregator.consume_metrics().len(), 0); - } - - #[test] - #[cfg_attr(miri, ignore)] - fn consume_metrics_batch_bytes() { - let expected_metrics_per_batch = 2; - let total_number_of_metrics = 5; - let two_metrics_size = 374; - let max_bytes = SINGLE_METRIC_SIZE * expected_metrics_per_batch + 13; - let mut aggregator = Aggregator { - tags: to_sorted_tags(), - map: hash_table::HashTable::new(), - max_batch_entries_single_metric: 1_000, - max_batch_bytes_single_metric: max_bytes as u64, - max_batch_entries_sketch_metric: 1_000, - max_batch_bytes_sketch_metric: 1_000, - max_context: 1_000, - }; - - add_metrics(total_number_of_metrics, &mut aggregator, "c".to_string()); - let batched = aggregator.consume_metrics(); - - assert_eq!( - batched.len(), - total_number_of_metrics / expected_metrics_per_batch + 1 - ); - assert_eq!( - serde_json::to_vec(batched.first().unwrap()).unwrap().len(), - two_metrics_size - ); - assert_eq!( - serde_json::to_vec(batched.get(1).unwrap()).unwrap().len(), - two_metrics_size - ); - assert_eq!( - serde_json::to_vec(batched.get(2).unwrap()).unwrap().len(), - SINGLE_METRIC_SIZE - ); - } - - #[test] - #[cfg_attr(miri, ignore)] - fn consume_series_one_element_bigger_than_max_size() { - let max_bytes = 1; - let tot = 5; - let mut aggregator = Aggregator { - tags: to_sorted_tags(), - map: hash_table::HashTable::new(), - max_batch_entries_single_metric: 1_000, - max_batch_bytes_single_metric: max_bytes, - max_batch_entries_sketch_metric: 1_000, - max_batch_bytes_sketch_metric: 1_000, - max_context: 1_000, - }; - - add_metrics(tot, &mut aggregator, "c".to_string()); - let batched = aggregator.consume_metrics(); - - assert_eq!(batched.len(), tot); - for a_batch in batched { - assert_eq!( - serde_json::to_vec(&a_batch).unwrap().len(), - SINGLE_METRIC_SIZE - ); - } - } - - #[test] - #[cfg_attr(miri, ignore)] - fn distribution_serialized_deserialized() { - let mut aggregator = Aggregator::new(EMPTY_TAGS, 1_000).unwrap(); - - add_metrics(10, &mut aggregator, "d".to_string()); - let distribution = aggregator.distributions_to_protobuf(); - assert_eq!(distribution.sketches().len(), 10); - - let serialized = distribution - .write_to_bytes() - .expect("Can't serialized proto"); - - let deserialized = - SketchPayload::parse_from_bytes(serialized.as_slice()).expect("failed to parse proto"); - - assert_eq!(deserialized.sketches().len(), 10); - assert_eq!(deserialized, distribution); - } -} diff --git a/dogstatsd/src/constants.rs b/dogstatsd/src/constants.rs deleted file mode 100644 index adf263167e..0000000000 --- a/dogstatsd/src/constants.rs +++ /dev/null @@ -1,19 +0,0 @@ -// Copyright 2023-Present Datadog, Inc. https://www.datadoghq.com/ -// SPDX-License-Identifier: Apache-2.0 - -/// The maximum tags that a `Metric` may hold. -pub const MAX_TAGS: usize = 100; - -pub const CONTEXTS: usize = 10_240; - -pub static MAX_CONTEXTS: usize = 65_536; // 2**16, arbitrary - -const MB: u64 = 1_024 * 1_024; - -pub(crate) const MAX_ENTRIES_SINGLE_METRIC: usize = 1_000; - -pub(crate) const MAX_SIZE_BYTES_SINGLE_METRIC: u64 = 5 * MB; - -pub(crate) const MAX_ENTRIES_SKETCH_METRIC: usize = 1_000; - -pub(crate) const MAX_SIZE_SKETCH_METRIC: u64 = 62 * MB; diff --git a/dogstatsd/src/datadog.rs b/dogstatsd/src/datadog.rs deleted file mode 100644 index 4bbcd13de3..0000000000 --- a/dogstatsd/src/datadog.rs +++ /dev/null @@ -1,438 +0,0 @@ -// Copyright 2023-Present Datadog, Inc. https://www.datadoghq.com/ -// SPDX-License-Identifier: Apache-2.0 - -//!Types to serialize data into the Datadog API - -use crate::flusher::ShippingError; -use datadog_protos::metrics::SketchPayload; -use derive_more::{Display, Into}; -use protobuf::Message; -use regex::Regex; -use reqwest; -use reqwest::{Client, Response}; -use serde::{Serialize, Serializer}; -use serde_json; -use std::io::Write; -use std::sync::OnceLock; -use std::time::Duration; -use tracing::{debug, error}; -use zstd::stream::write::Encoder; - -// TODO: Move to the more ergonomic LazyLock when MSRV is 1.80 -static SITE_RE: OnceLock = OnceLock::new(); -fn get_site_re() -> &'static Regex { - #[allow(clippy::expect_used)] - SITE_RE.get_or_init(|| Regex::new(r"^[a-zA-Z0-9._:-]+$").expect("invalid regex")) -} -static URL_PREFIX_RE: OnceLock = OnceLock::new(); -fn get_url_prefix_re() -> &'static Regex { - #[allow(clippy::expect_used)] - URL_PREFIX_RE.get_or_init(|| Regex::new(r"^https?://[a-zA-Z0-9._:-]+$").expect("invalid regex")) -} - -#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Display, Into)] -pub struct Site(String); - -#[derive(thiserror::Error, Debug, Clone, PartialEq)] -#[error("Invalid site: {0}")] -pub struct SiteError(String); - -impl Site { - pub fn new(site: String) -> Result { - // Datadog sites are generally domain names. In particular, they shouldn't have any slashes - // in them. We expect this to be coming from a `DD_SITE` environment variable or the `site` - // config field. - if get_site_re().is_match(&site) { - Ok(Site(site)) - } else { - Err(SiteError(site)) - } - } -} - -#[derive(thiserror::Error, Debug, Clone, PartialEq)] -#[error("Invalid URL prefix: {0}")] -pub struct UrlPrefixError(String); - -fn validate_url_prefix(prefix: &str) -> Result<(), UrlPrefixError> { - if get_url_prefix_re().is_match(prefix) { - Ok(()) - } else { - Err(UrlPrefixError(prefix.to_owned())) - } -} - -#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Display, Into)] -pub struct DdUrl(String); - -impl DdUrl { - pub fn new(prefix: String) -> Result { - validate_url_prefix(&prefix)?; - Ok(Self(prefix)) - } -} - -#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Display, Into)] -pub struct DdDdUrl(String); - -impl DdDdUrl { - pub fn new(prefix: String) -> Result { - validate_url_prefix(&prefix)?; - Ok(Self(prefix)) - } -} - -#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Display, Into)] -pub struct MetricsIntakeUrlPrefixOverride(String); - -impl MetricsIntakeUrlPrefixOverride { - pub fn maybe_new(dd_url: Option, dd_dd_url: Option) -> Option { - match (dd_url, dd_dd_url) { - (None, None) => None, - (_, Some(dd_dd_url)) => Some(Self(dd_dd_url.into())), - (Some(dd_url), None) => Some(Self(dd_url.into())), - } - } -} - -#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Display)] -pub struct MetricsIntakeUrlPrefix(String); - -#[derive(thiserror::Error, Debug, Clone, PartialEq)] -#[error("Missing intake URL configuration")] -pub struct MissingIntakeUrlError; - -impl MetricsIntakeUrlPrefix { - #[inline] - pub fn new( - site: Option, - overridden_prefix: Option, - ) -> Result { - match (site, overridden_prefix) { - (None, None) => Err(MissingIntakeUrlError), - (_, Some(prefix)) => Ok(Self::new_expect_validated(prefix.into())), - (Some(site), None) => Ok(Self::from_site(site)), - } - } - - #[inline] - fn new_expect_validated(validated_prefix: String) -> Self { - #[allow(clippy::expect_used)] - validate_url_prefix(&validated_prefix).expect("Invalid URL prefix"); - - Self(validated_prefix) - } - - #[inline] - fn from_site(site: Site) -> Self { - Self(format!("https://api.{site}")) - } -} - -/// Interface for the `DogStatsD` metrics intake API. -#[derive(Debug, Clone)] -pub struct DdApi { - api_key: String, - metrics_intake_url_prefix: MetricsIntakeUrlPrefix, - client: Option, - retry_strategy: RetryStrategy, -} - -impl DdApi { - #[must_use] - pub fn new( - api_key: String, - metrics_intake_url_prefix: MetricsIntakeUrlPrefix, - https_proxy: Option, - timeout: Duration, - retry_strategy: RetryStrategy, - ) -> Self { - let client = build_client(https_proxy, timeout) - .inspect_err(|e| { - error!("Unable to create client {:?}", e); - }) - .ok(); - DdApi { - api_key, - metrics_intake_url_prefix, - client, - retry_strategy, - } - } - - /// Ship a serialized series to the API, blocking - pub async fn ship_series(&self, series: &Series) -> Result { - let url = format!("{}/api/v2/series", &self.metrics_intake_url_prefix); - let safe_body = serde_json::to_vec(&series) - .map_err(|e| ShippingError::Payload(format!("Failed to serialize series: {e}")))?; - debug!("Sending body: {:?}", &series); - self.ship_data(url, safe_body, "application/json").await - } - - pub async fn ship_distributions( - &self, - sketches: &SketchPayload, - ) -> Result { - let url = format!("{}/api/beta/sketches", &self.metrics_intake_url_prefix); - let safe_body = sketches - .write_to_bytes() - .map_err(|e| ShippingError::Payload(format!("Failed to serialize series: {e}")))?; - debug!("Sending distributions: {:?}", &sketches); - self.ship_data(url, safe_body, "application/x-protobuf") - .await - // TODO maybe go to coded output stream if we incrementally - // add sketch payloads to the buffer - // something like this, but fix the utf-8 encoding issue - // { - // let mut output_stream = CodedOutputStream::vec(&mut buf); - // let _ = output_stream.write_tag(1, protobuf::rt::WireType::LengthDelimited); - // let _ = output_stream.write_message_no_tag(&sketches); - // TODO not working, has utf-8 encoding issue in dist-intake - //} - } - - async fn ship_data( - &self, - url: String, - body: Vec, - content_type: &str, - ) -> Result { - let client = &self - .client - .as_ref() - .ok_or_else(|| ShippingError::Destination(None, "No client".to_string()))?; - let start = std::time::Instant::now(); - - let result = (|| -> std::io::Result> { - let mut encoder = Encoder::new(Vec::new(), 6)?; - encoder.write_all(&body)?; - encoder.finish() - })(); - - let mut builder = client - .post(&url) - .header("DD-API-KEY", &self.api_key) - .header("Content-Type", content_type); - - builder = match result { - Ok(compressed) => builder.header("Content-Encoding", "zstd").body(compressed), - Err(err) => { - debug!("Sending uncompressed data, failed to compress: {err}"); - builder.body(body) - } - }; - - let resp = self.send_with_retry(builder).await; - - let elapsed = start.elapsed(); - debug!("Request to {} took {}ms", url, elapsed.as_millis()); - resp - } - - async fn send_with_retry( - &self, - builder: reqwest::RequestBuilder, - ) -> Result { - let mut attempts = 0; - loop { - attempts += 1; - let cloned_builder = match builder.try_clone() { - Some(b) => b, - None => { - return Err(ShippingError::Destination( - None, - "Failed to clone request".to_string(), - )); - } - }; - - let response = cloned_builder.send().await; - match response { - Ok(response) if response.status().is_success() => { - return Ok(response); - } - _ => {} - } - - match self.retry_strategy { - RetryStrategy::LinearBackoff(max_attempts, _) - | RetryStrategy::Immediate(max_attempts) - if attempts >= max_attempts => - { - let status = match response { - Ok(response) => Some(response.status()), - Err(err) => err.status(), - }; - // handle if status code missing like timeout - return Err(ShippingError::Destination( - status, - format!("Failed to send request after {} attempts", max_attempts) - .to_string(), - )); - } - RetryStrategy::LinearBackoff(_, delay) => { - tokio::time::sleep(Duration::from_millis(delay)).await; - } - _ => {} - } - } - } -} - -#[derive(Debug, Clone)] -pub enum RetryStrategy { - Immediate(u64), // attempts - LinearBackoff(u64, u64), // attempts, delay -} - -fn build_client(https_proxy: Option, timeout: Duration) -> Result { - let mut builder = Client::builder().timeout(timeout); - if let Some(proxy) = https_proxy { - builder = builder.proxy(reqwest::Proxy::https(proxy)?); - } - builder.build() -} - -#[derive(Debug, Serialize, Clone, Copy)] -/// A single point in time -pub(crate) struct Point { - /// The time at which the point exists - pub(crate) timestamp: u64, - /// The point's value - pub(crate) value: f64, -} - -#[derive(Debug, Serialize)] -/// A named resource -pub(crate) struct Resource { - /// The name of this resource - pub(crate) name: String, - #[serde(rename = "type")] - /// The kind of this resource - pub(crate) kind: String, -} - -#[derive(Debug, Clone, Copy)] -/// The kinds of metrics the Datadog API supports -pub(crate) enum DdMetricKind { - /// An accumulating sum - Count, - /// An instantaneous value - Gauge, -} - -impl Serialize for DdMetricKind { - fn serialize(&self, serializer: S) -> Result - where - S: Serializer, - { - match *self { - DdMetricKind::Count => serializer.serialize_u32(0), - DdMetricKind::Gauge => serializer.serialize_u32(1), - } - } -} - -#[derive(Debug, Serialize)] -#[allow(clippy::struct_field_names)] -/// A named collection of `Point` instances. -pub(crate) struct Metric { - /// The name of the point collection - pub(crate) metric: &'static str, - /// The collection of points - pub(crate) points: [Point; 1], - /// The resources associated with the points - pub(crate) resources: Vec, - #[serde(rename = "type")] - /// The kind of metric - pub(crate) kind: DdMetricKind, - pub(crate) tags: Vec, -} - -#[derive(Debug, Serialize)] -/// A collection of metrics as defined by the Datadog Metrics API. -// NOTE we have a number of `Vec` instances in this implementation that could -// otherwise be arrays, given that we have constants. Serializing to JSON would -// require us to avoid serializing None or Uninit values, so there's some custom -// work that's needed. For protobuf this more or less goes away. -pub struct Series { - /// The collection itself - pub(crate) series: Vec, -} - -impl Series { - #[cfg(test)] - pub(crate) fn len(&self) -> usize { - self.series.len() - } -} - -#[cfg(test)] -mod test { - use super::*; - - #[test] - fn override_can_be_empty() { - assert_eq!(MetricsIntakeUrlPrefixOverride::maybe_new(None, None), None); - } - - #[test] - fn override_prefers_dd_dd_url() { - assert_eq!( - MetricsIntakeUrlPrefixOverride::maybe_new( - Some(DdUrl::new("http://a_dd_url".to_string()).unwrap()), - Some(DdDdUrl::new("https://a_dd_dd_url".to_string()).unwrap()) - ), - Some(MetricsIntakeUrlPrefixOverride( - "https://a_dd_dd_url".to_string() - )) - ); - } - - #[test] - fn override_will_take_dd_url() { - assert_eq!( - MetricsIntakeUrlPrefixOverride::maybe_new( - Some(DdUrl::new("http://a_dd_url".to_string()).unwrap()), - None - ), - Some(MetricsIntakeUrlPrefixOverride( - "http://a_dd_url".to_string() - )) - ); - } - - #[test] - fn test_intake_url_prefix_new_requires_something() { - assert_eq!( - MetricsIntakeUrlPrefix::new(None, None), - Err(MissingIntakeUrlError) - ); - } - - #[test] - fn test_intake_url_prefix_new_picks_the_override() { - assert_eq!( - MetricsIntakeUrlPrefix::new( - Some(Site::new("a_site".to_string()).unwrap()), - MetricsIntakeUrlPrefixOverride::maybe_new( - Some(DdUrl::new("http://a_dd_url".to_string()).unwrap()), - None - ), - ), - Ok(MetricsIntakeUrlPrefix::new_expect_validated( - "http://a_dd_url".to_string() - )) - ); - } - - #[test] - fn test_intake_url_prefix_new_picks_site_as_a_fallback() { - assert_eq!( - MetricsIntakeUrlPrefix::new(Some(Site::new("a_site".to_string()).unwrap()), None,), - Ok(MetricsIntakeUrlPrefix::new_expect_validated( - "https://api.a_site".to_string() - )) - ); - } -} diff --git a/dogstatsd/src/dogstatsd.rs b/dogstatsd/src/dogstatsd.rs deleted file mode 100644 index ea8b3c802e..0000000000 --- a/dogstatsd/src/dogstatsd.rs +++ /dev/null @@ -1,274 +0,0 @@ -// Copyright 2023-Present Datadog, Inc. https://www.datadoghq.com/ -// SPDX-License-Identifier: Apache-2.0 - -use std::net::SocketAddr; -use std::str::Split; -use std::sync::{Arc, Mutex}; - -use crate::aggregator::Aggregator; -use crate::errors::ParseError::UnsupportedType; -use crate::metric::{parse, Metric}; -use tracing::{debug, error}; - -pub struct DogStatsD { - cancel_token: tokio_util::sync::CancellationToken, - aggregator: Arc>, - buffer_reader: BufferReader, -} - -pub struct DogStatsDConfig { - pub host: String, - pub port: u16, -} - -enum BufferReader { - UdpSocketReader(tokio::net::UdpSocket), - #[allow(dead_code)] - MirrorReader(Vec, SocketAddr), -} - -impl BufferReader { - async fn read(&self) -> std::io::Result<(Vec, SocketAddr)> { - match self { - BufferReader::UdpSocketReader(socket) => { - // TODO(astuyve) this should be dynamic - // Max buffer size is configurable in Go Agent and the default is 8KB - // https://github.com/DataDog/datadog-agent/blob/85939a62b5580b2a15549f6936f257e61c5aa153/pkg/config/config_template.yaml#L2154-L2158 - let mut buf = [0; 8192]; - - #[allow(clippy::expect_used)] - let (amt, src) = socket - .recv_from(&mut buf) - .await - .expect("didn't receive data"); - Ok((buf[..amt].to_owned(), src)) - } - BufferReader::MirrorReader(data, socket) => Ok((data.clone(), *socket)), - } - } -} - -impl DogStatsD { - #[must_use] - pub async fn new( - config: &DogStatsDConfig, - aggregator: Arc>, - cancel_token: tokio_util::sync::CancellationToken, - ) -> DogStatsD { - let addr = format!("{}:{}", config.host, config.port); - - // TODO (UDS socket) - #[allow(clippy::expect_used)] - let socket = tokio::net::UdpSocket::bind(addr) - .await - .expect("couldn't bind to address"); - DogStatsD { - cancel_token, - aggregator, - buffer_reader: BufferReader::UdpSocketReader(socket), - } - } - - pub async fn spin(self) { - let mut spin_cancelled = false; - while !spin_cancelled { - self.consume_statsd().await; - spin_cancelled = self.cancel_token.is_cancelled(); - } - } - - async fn consume_statsd(&self) { - #[allow(clippy::expect_used)] - let (buf, src) = self - .buffer_reader - .read() - .await - .expect("didn't receive data"); - - #[allow(clippy::expect_used)] - let msgs = std::str::from_utf8(&buf).expect("couldn't parse as string"); - debug!("Received message: {} from {}", msgs, src); - let statsd_metric_strings = msgs.split('\n'); - self.insert_metrics(statsd_metric_strings); - } - - fn insert_metrics(&self, msg: Split) { - let all_valid_metrics: Vec = msg - .filter(|m| { - !m.is_empty() - && !m.starts_with("_sc|") - && !m.starts_with("_e{") - // todo(serverless): remove this hack, and create a blocklist for metrics - // or another mechanism for this. - // - // avoid metric duplication with lambda layer - && !m.starts_with("aws.lambda.enhanced.invocations") - }) // exclude empty messages, service checks, and events - .map(|m| m.replace('\n', "")) - .filter_map(|m| match parse(m.as_str()) { - Ok(metric) => Some(metric), - Err(e) => { - // unsupported type is quite common with dd_trace metrics. Avoid perf issue and - // log spam in that case - match e { - UnsupportedType(_) => debug!("Unsupported metric type: {}. {}", m, e), - _ => error!("Failed to parse metric {}: {}", m, e), - } - None - } - }) - .collect(); - if !all_valid_metrics.is_empty() { - #[allow(clippy::expect_used)] - let mut guarded_aggregator = self.aggregator.lock().expect("lock poisoned"); - for a_valid_value in all_valid_metrics { - let _ = guarded_aggregator.insert(a_valid_value); - } - } - } -} - -#[cfg(test)] -#[allow(clippy::unwrap_used)] -mod tests { - use crate::aggregator::tests::assert_sketch; - use crate::aggregator::tests::assert_value; - use crate::aggregator::Aggregator; - use crate::dogstatsd::{BufferReader, DogStatsD}; - use crate::metric::EMPTY_TAGS; - use std::net::{IpAddr, Ipv4Addr, SocketAddr}; - use std::sync::{Arc, Mutex}; - use tracing_test::traced_test; - - #[tokio::test] - #[cfg_attr(miri, ignore)] - async fn test_dogstatsd_multi_distribution() { - let locked_aggregator = setup_dogstatsd( - "single_machine_performance.rouster.api.series_v2.payload_size_bytes:269942|d|T1656581409 -single_machine_performance.rouster.metrics_min_timestamp_latency:1426.90870216|d|T1656581409 -single_machine_performance.rouster.metrics_max_timestamp_latency:1376.90870216|d|T1656581409 -", - ) - .await; - let aggregator = locked_aggregator.lock().expect("lock poisoned"); - - let parsed_metrics = aggregator.distributions_to_protobuf(); - - assert_eq!(parsed_metrics.sketches.len(), 3); - assert_eq!(aggregator.to_series().len(), 0); - drop(aggregator); - - assert_sketch( - &locked_aggregator, - "single_machine_performance.rouster.api.series_v2.payload_size_bytes", - 269_942_f64, - 1656581400, - ); - assert_sketch( - &locked_aggregator, - "single_machine_performance.rouster.metrics_min_timestamp_latency", - 1_426.908_702_16, - 1656581400, - ); - assert_sketch( - &locked_aggregator, - "single_machine_performance.rouster.metrics_max_timestamp_latency", - 1_376.908_702_16, - 1656581400, - ); - } - - #[tokio::test] - #[cfg_attr(miri, ignore)] - async fn test_dogstatsd_multi_metric() { - let mut now = std::time::UNIX_EPOCH - .elapsed() - .expect("unable to poll clock, unrecoverable") - .as_secs() - .try_into() - .unwrap_or_default(); - now = (now / 10) * 10; - let locked_aggregator = setup_dogstatsd( - format!( - "metric3:3|c|#tag3:val3,tag4:val4\nmetric1:1|c\nmetric2:2|c|#tag2:val2|T{:}\n", - now - ) - .as_str(), - ) - .await; - let aggregator = locked_aggregator.lock().expect("lock poisoned"); - - let parsed_metrics = aggregator.to_series(); - - assert_eq!(parsed_metrics.len(), 3); - assert_eq!(aggregator.distributions_to_protobuf().sketches.len(), 0); - drop(aggregator); - - assert_value(&locked_aggregator, "metric1", 1.0, "", now); - assert_value(&locked_aggregator, "metric2", 2.0, "tag2:val2", now); - assert_value( - &locked_aggregator, - "metric3", - 3.0, - "tag3:val3,tag4:val4", - now, - ); - } - - #[tokio::test] - #[cfg_attr(miri, ignore)] - async fn test_dogstatsd_single_metric() { - let locked_aggregator = setup_dogstatsd("metric123:99123|c|T1656581409").await; - let aggregator = locked_aggregator.lock().expect("lock poisoned"); - let parsed_metrics = aggregator.to_series(); - - assert_eq!(parsed_metrics.len(), 1); - assert_eq!(aggregator.distributions_to_protobuf().sketches.len(), 0); - drop(aggregator); - - assert_value(&locked_aggregator, "metric123", 99_123.0, "", 1656581400); - } - - #[tokio::test] - #[traced_test] - #[cfg_attr(miri, ignore)] - async fn test_dogstatsd_filter_service_check() { - let locked_aggregator = setup_dogstatsd("_sc|servicecheck|0").await; - let aggregator = locked_aggregator.lock().expect("lock poisoned"); - let parsed_metrics = aggregator.to_series(); - - assert!(!logs_contain("Failed to parse metric")); - assert_eq!(parsed_metrics.len(), 0); - } - - #[tokio::test] - #[traced_test] - #[cfg_attr(miri, ignore)] - async fn test_dogstatsd_filter_event() { - let locked_aggregator = setup_dogstatsd("_e{5,10}:event|test event").await; - let aggregator = locked_aggregator.lock().expect("lock poisoned"); - let parsed_metrics = aggregator.to_series(); - - assert!(!logs_contain("Failed to parse metric")); - assert_eq!(parsed_metrics.len(), 0); - } - - async fn setup_dogstatsd(statsd_string: &str) -> Arc> { - let aggregator_arc = Arc::new(Mutex::new( - Aggregator::new(EMPTY_TAGS, 1_024).expect("aggregator creation failed"), - )); - let cancel_token = tokio_util::sync::CancellationToken::new(); - - let dogstatsd = DogStatsD { - cancel_token, - aggregator: Arc::clone(&aggregator_arc), - buffer_reader: BufferReader::MirrorReader( - statsd_string.as_bytes().to_vec(), - SocketAddr::new(IpAddr::V4(Ipv4Addr::new(111, 112, 113, 114)), 0), - ), - }; - dogstatsd.consume_statsd().await; - - aggregator_arc - } -} diff --git a/dogstatsd/src/errors.rs b/dogstatsd/src/errors.rs deleted file mode 100644 index fffcca84bb..0000000000 --- a/dogstatsd/src/errors.rs +++ /dev/null @@ -1,35 +0,0 @@ -// Copyright 2023-Present Datadog, Inc. https://www.datadoghq.com/ -// SPDX-License-Identifier: Apache-2.0 - -//! Error types for `metrics` module - -/// Errors for the function [`crate::metric::Metric::parse`] -#[derive(Debug, thiserror::Error, PartialEq)] -pub enum ParseError { - /// Parse failure given in text - #[error("parse failure: {0}")] - Raw(String), - #[error("unsupported metric type: {0}")] - UnsupportedType(String), -} - -/// Failure to create a new `Aggregator` -#[derive(Debug, thiserror::Error, Clone, Copy)] -pub enum Creation { - /// The specified context max is too large given our constants. Indicates a - /// serious programming error. - #[error("context max is too large")] - Contexts, -} - -/// Failures from `Aggregator::insert` -#[derive(Debug, thiserror::Error)] -pub enum Insert { - /// The current interval is full and no further metrics can be inserted. The - /// inserted metric is returned. - #[error("interval is full")] - Overflow, - /// Unable to parse passed values - #[error(transparent)] - ValuesIteration(#[from] std::num::ParseFloatError), -} diff --git a/dogstatsd/src/flusher.rs b/dogstatsd/src/flusher.rs deleted file mode 100644 index 35a3f8c3de..0000000000 --- a/dogstatsd/src/flusher.rs +++ /dev/null @@ -1,115 +0,0 @@ -// Copyright 2023-Present Datadog, Inc. https://www.datadoghq.com/ -// SPDX-License-Identifier: Apache-2.0 - -use crate::aggregator::Aggregator; -use crate::datadog::{DdApi, MetricsIntakeUrlPrefix, RetryStrategy}; -use reqwest::{Response, StatusCode}; -use std::sync::{Arc, Mutex}; -use std::time::Duration; -use tracing::{debug, error}; - -pub struct Flusher { - dd_api: DdApi, - aggregator: Arc>, -} - -pub struct FlusherConfig { - pub api_key: String, - pub aggregator: Arc>, - pub metrics_intake_url_prefix: MetricsIntakeUrlPrefix, - pub https_proxy: Option, - pub timeout: Duration, - pub retry_strategy: RetryStrategy, -} - -#[allow(clippy::await_holding_lock)] -impl Flusher { - pub fn new(config: FlusherConfig) -> Self { - let dd_api = DdApi::new( - config.api_key, - config.metrics_intake_url_prefix, - config.https_proxy, - config.timeout, - config.retry_strategy, - ); - Flusher { - dd_api, - aggregator: config.aggregator, - } - } - - pub async fn flush(&mut self) { - let (all_series, all_distributions) = { - #[allow(clippy::expect_used)] - let mut aggregator = self.aggregator.lock().expect("lock poisoned"); - ( - aggregator.consume_metrics(), - aggregator.consume_distributions(), - ) - }; - - let n_series = all_series.len(); - let n_distributions = all_distributions.len(); - - debug!("Flushing {n_series} series and {n_distributions} distributions"); - - let dd_api_clone = self.dd_api.clone(); - let series_handle = tokio::spawn(async move { - for a_batch in all_series { - let continue_shipping = - should_try_next_batch(dd_api_clone.ship_series(&a_batch).await).await; - if !continue_shipping { - break; - } - } - }); - let dd_api_clone = self.dd_api.clone(); - let distributions_handle = tokio::spawn(async move { - for a_batch in all_distributions { - let continue_shipping = - should_try_next_batch(dd_api_clone.ship_distributions(&a_batch).await).await; - if !continue_shipping { - break; - } - } - }); - - match tokio::try_join!(series_handle, distributions_handle) { - Ok(_) => { - debug!("Successfully flushed {n_series} series and {n_distributions} distributions") - } - Err(err) => { - error!("Failed to flush metrics{err}") - } - }; - } -} - -pub enum ShippingError { - Payload(String), - Destination(Option, String), -} - -async fn should_try_next_batch(resp: Result) -> bool { - match resp { - Ok(resp_payload) => match resp_payload.status() { - StatusCode::ACCEPTED => true, - unexpected_status_code => { - error!( - "{}: Failed to push to API: {:?}", - unexpected_status_code, - resp_payload.text().await.unwrap_or_default() - ); - true - } - }, - Err(ShippingError::Payload(msg)) => { - error!("Failed to prepare payload. Data dropped: {}", msg); - true - } - Err(ShippingError::Destination(sc, msg)) => { - error!("Error shipping data: {:?} {}", sc, msg); - false - } - } -} diff --git a/dogstatsd/src/lib.rs b/dogstatsd/src/lib.rs deleted file mode 100644 index 4009db1478..0000000000 --- a/dogstatsd/src/lib.rs +++ /dev/null @@ -1,16 +0,0 @@ -// Copyright 2023-Present Datadog, Inc. https://www.datadoghq.com/ -// SPDX-License-Identifier: Apache-2.0 - -#![cfg_attr(not(test), deny(clippy::panic))] -#![cfg_attr(not(test), deny(clippy::unwrap_used))] -#![cfg_attr(not(test), deny(clippy::expect_used))] -#![cfg_attr(not(test), deny(clippy::todo))] -#![cfg_attr(not(test), deny(clippy::unimplemented))] - -pub mod aggregator; -pub mod constants; -pub mod datadog; -pub mod dogstatsd; -pub mod errors; -pub mod flusher; -pub mod metric; diff --git a/dogstatsd/src/metric.rs b/dogstatsd/src/metric.rs deleted file mode 100644 index 35ebe1628f..0000000000 --- a/dogstatsd/src/metric.rs +++ /dev/null @@ -1,661 +0,0 @@ -// Copyright 2023-Present Datadog, Inc. https://www.datadoghq.com/ -// SPDX-License-Identifier: Apache-2.0 - -use crate::errors::ParseError; -use crate::{constants, datadog}; -use ddsketch_agent::DDSketch; -use fnv::FnvHasher; -use protobuf::Chars; -use regex::Regex; -use std::hash::{Hash, Hasher}; -use std::sync::OnceLock; -use ustr::Ustr; - -pub const EMPTY_TAGS: SortedTags = SortedTags { values: Vec::new() }; - -// https://docs.datadoghq.com/developers/dogstatsd/datagram_shell?tab=metrics#dogstatsd-protocol-v13 -static METRIC_REGEX: OnceLock = OnceLock::new(); -fn get_metric_regex() -> &'static Regex { - #[allow(clippy::expect_used)] - METRIC_REGEX.get_or_init(|| { - Regex::new( - r"^(?P[^:]+):(?P[^|]+)\|(?P[a-zA-Z]+)(?:\|@(?P[\d.]+))?(?:\|#(?P[^|]+))?(?:\|c:(?P[^|]+))?(?:\|T(?P[^|]+))?$", - ) - .expect("Failed to create metric regex") - }) -} - -#[derive(Clone, Debug)] -pub enum MetricValue { - /// Dogstatsd 'count' metric type, monotonically increasing counter - Count(f64), - /// Dogstatsd 'gauge' metric type, point-in-time value - Gauge(f64), - /// Dogstatsd 'distribution' metric type, histogram - Distribution(DDSketch), -} - -impl MetricValue { - pub fn count(v: f64) -> MetricValue { - MetricValue::Count(v) - } - - pub fn gauge(v: f64) -> MetricValue { - MetricValue::Gauge(v) - } - - pub fn distribution(v: f64) -> MetricValue { - let sketch = &mut DDSketch::default(); - sketch.insert(v); - MetricValue::Distribution(sketch.to_owned()) - } -} - -#[derive(Clone, Debug)] -pub struct SortedTags { - // We sort tags. This is in feature parity with DogStatsD and also means - // that we avoid storing the same context multiple times because users have - // passed tags in different order through time. - values: Vec<(Ustr, Ustr)>, -} - -impl SortedTags { - pub fn extend(&mut self, other: &SortedTags) { - self.values.extend_from_slice(&other.values); - self.values.dedup(); - self.values.sort_unstable(); - } - - pub fn is_empty(&self) -> bool { - self.values.is_empty() - } - - pub fn parse(tags_section: &str) -> Result { - let total_tags = tags_section.bytes().filter(|&b| b == b',').count() + 1; - let mut parsed_tags = Vec::with_capacity(total_tags); - - for part in tags_section.split(',').filter(|s| !s.is_empty()) { - if let Some(i) = part.find(':') { - // Avoid creating a new string via split_once - let (k, v) = (&part[..i], &part[i + 1..]); - parsed_tags.push((Ustr::from(k), Ustr::from(v))); - } else { - parsed_tags.push((Ustr::from(part), Ustr::from(""))); - } - } - - parsed_tags.dedup(); - if parsed_tags.len() > constants::MAX_TAGS { - return Err(ParseError::Raw(format!( - "Too many tags, more than {c}", - c = constants::MAX_TAGS - ))); - } - - parsed_tags.sort_unstable(); - Ok(SortedTags { - values: parsed_tags, - }) - } - - pub fn to_chars(&self) -> Vec { - let mut tags_as_chars = Vec::new(); - for (k, v) in &self.values { - if v.is_empty() { - tags_as_chars.push(Chars::from(k.to_string())); - } else { - let mut a_tag = String::with_capacity(k.len() + v.len() + 1); - a_tag.push_str(k); - a_tag.push(':'); - a_tag.push_str(v); - tags_as_chars.push(a_tag.into()); - } - } - tags_as_chars - } - - pub fn to_strings(&self) -> Vec { - let mut tags_as_vec = Vec::new(); - for (k, v) in &self.values { - if v.is_empty() { - tags_as_vec.push(k.to_string()); - } else { - let mut a_tag = String::with_capacity(k.len() + v.len() + 1); - a_tag.push_str(k); - a_tag.push(':'); - a_tag.push_str(v); - tags_as_vec.push(a_tag); - } - } - tags_as_vec - } - - pub(crate) fn to_resources(&self) -> Vec { - let mut resources = Vec::with_capacity(constants::MAX_TAGS); - for (key, val) in &self.values { - if key == "dd.internal.resource" { - //anything coming in via dd.internal.resource: has to be a key/value pair - // (e.g., dd.internal.resource:key:value) - if let Some(valid_name_kind) = val.split_once(':') { - let resource = datadog::Resource { - name: valid_name_kind.0.to_string(), - kind: valid_name_kind.1.to_string(), - }; - resources.push(resource); - } - } - } - resources - } -} - -/// Representation of a dogstatsd Metric -/// -/// For now this implementation covers only counters and gauges. We hope this is -/// enough to demonstrate the impact of this program's design goals. -#[derive(Clone, Debug)] -pub struct Metric { - /// Name of the metric. - /// - /// Never more bytes than `constants::MAX_METRIC_NAME_BYTES`, - /// enforced by construction. Note utf8 issues. - pub name: Ustr, - /// Values of the metric. A singular value may be either a floating point or - /// a integer. Although undocumented we assume 64 bit. A single metric may - /// encode multiple values a time in a message. There must be at least one - /// value here, meaning that there is guaranteed to be a Some value in the - /// 0th index. - /// - /// Parsing of the values to an integer type is deferred until the last - /// moment. - /// - /// Never longer than `constants::MAX_VALUE_BYTES`. Note utf8 issues. - pub value: MetricValue, - /// Tags of the metric. - /// - /// The key is never longer than `constants::MAX_TAG_KEY_BYTES`, the value - /// never more than `constants::MAX_TAG_VALUE_BYTES`. These are enforced by - /// the parser. We assume here that tags are not sent in random order by the - /// clien or that, if they are, the API will tidy that up. That is `a:1,b:2` - /// is a different tagset from `b:2,a:1`. - pub tags: Option, - - /// ID given a name and tagset. - pub id: u64, - // Timestamp - pub timestamp: i64, -} - -impl Metric { - pub fn new( - name: Ustr, - value: MetricValue, - tags: Option, - timestamp: Option, - ) -> Metric { - #[allow(clippy::expect_used)] - let parsed_timestamp = timestamp_to_bucket(timestamp.unwrap_or_else(|| { - std::time::UNIX_EPOCH - .elapsed() - .expect("unable to poll clock, unrecoverable") - .as_secs() - .try_into() - .unwrap_or_default() - })); - - let id = id(name, &tags, parsed_timestamp); - Metric { - name, - value, - tags, - id, - timestamp: parsed_timestamp, - } - } -} - -// Round down to the nearest 10 seconds -// to form a bucket of metric contexts aggregated per 10s -pub fn timestamp_to_bucket(timestamp: i64) -> i64 { - #[allow(clippy::expect_used)] - let now_seconds: i64 = std::time::UNIX_EPOCH - .elapsed() - .expect("unable to poll clock, unrecoverable") - .as_secs() - .try_into() - .unwrap_or_default(); - if timestamp > now_seconds { - return (now_seconds / 10) * 10; - } - (timestamp / 10) * 10 -} - -/// Parse a metric from given input. -/// -/// This function parses a passed `&str` into a `Metric`. We assume that -/// `DogStatsD` metrics must be utf8 and are not ascii or some other encoding. -/// -/// # Errors -/// -/// This function will return with an error if the input violates any of the -/// limits in [`constants`]. Any non-viable input will be discarded. -/// example aj-test.increment:1|c|#user:aj-test from 127.0.0.1:50983 -pub fn parse(input: &str) -> Result { - // TODO must enforce / exploit constraints given in `constants`. - if let Some(caps) = get_metric_regex().captures(input) { - // unused for now - // let sample_rate = caps.name("sample_rate").map(|m| m.as_str()); - - let tags; - if let Some(tags_section) = caps.name("tags") { - tags = Some(SortedTags::parse(tags_section.as_str())?); - } else { - tags = None; - } - - #[allow(clippy::unwrap_used)] - let val = first_value(caps.name("values").unwrap().as_str())?; - - #[allow(clippy::unwrap_used)] - let t = caps.name("type").unwrap().as_str(); - - #[allow(clippy::expect_used)] - let now = std::time::UNIX_EPOCH - .elapsed() - .expect("unable to poll clock, unrecoverable") - .as_secs() - .try_into() - .unwrap_or_default(); - // let Metric::new() handle bucketing the timestamp - let parsed_timestamp: i64 = match caps.name("timestamp") { - Some(ts) => timestamp_to_bucket(ts.as_str().parse().unwrap_or(now)), - None => timestamp_to_bucket(now), - }; - let metric_value = match t { - "c" => MetricValue::Count(val), - "g" => MetricValue::Gauge(val), - "d" => { - let sketch = &mut DDSketch::default(); - sketch.insert(val); - MetricValue::Distribution(sketch.to_owned()) - } - "h" | "s" | "ms" => { - return Err(ParseError::UnsupportedType(t.to_string())); - } - _ => { - return Err(ParseError::Raw(format!("Invalid metric type: {t}"))); - } - }; - #[allow(clippy::unwrap_used)] - let name = Ustr::from(caps.name("name").unwrap().as_str()); - let id = id(name, &tags, parsed_timestamp); - return Ok(Metric { - name, - value: metric_value, - tags, - id, - timestamp: parsed_timestamp, - }); - } - Err(ParseError::Raw(format!("Invalid metric format {input}"))) -} - -fn first_value(values: &str) -> Result { - match values.split(':').next() { - Some(v) => match v.parse::() { - Ok(v) => Ok(v), - Err(e) => Err(ParseError::Raw(format!("Invalid value {e}"))), - }, - None => Err(ParseError::Raw("Missing value".to_string())), - } -} - -/// Create an ID given a name and tagset. -/// -/// This function constructs a hash of the name, the tagset and that hash is -/// identical no matter the internal order of the tagset. That is, we consider a -/// tagset like "a:1,b:2,c:3" to be idential to "b:2,c:3,a:1" to "c:3,a:1,b:2" -/// etc. This implies that we must sort the tagset after parsing it, which we -/// do. Duplicate tags are removed, so "a:1,a:1" will -/// hash to the same ID of "a:1". -/// -/// Note also that because we take `Ustr` arguments its possible that we've -/// interned many possible combinations of a tagset, even if they are identical -/// from the point of view of this function. -#[inline] -#[must_use] -pub fn id(name: Ustr, tags: &Option, timestamp: i64) -> u64 { - let mut hasher = FnvHasher::default(); - - name.hash(&mut hasher); - timestamp.hash(&mut hasher); - if let Some(tags_present) = tags { - for kv in tags_present.values.iter() { - kv.0.as_bytes().hash(&mut hasher); - kv.1.as_bytes().hash(&mut hasher); - } - } - hasher.finish() -} -// :::||@|#:, -// :,|T|c: -// -// Types: -// * c -- COUNT, allows packed values, summed -// * g -- GAUGE, allows packed values, last one wins -// -// SAMPLE_RATE ignored for the sake of simplicity. - -#[cfg(test)] -#[allow(clippy::unwrap_used)] -mod tests { - use proptest::{collection, option, strategy::Strategy, string::string_regex}; - use ustr::Ustr; - - use crate::metric::{id, parse, timestamp_to_bucket, MetricValue, SortedTags}; - - use super::ParseError; - - fn metric_name() -> impl Strategy { - string_regex("[a-zA-Z0-9.-]{1,128}").unwrap() - } - - fn metric_values() -> impl Strategy { - string_regex("[0-9]+(:[0-9]){0,8}").unwrap() - } - - fn metric_type() -> impl Strategy { - string_regex("g|c").unwrap() - } - - fn metric_tagset() -> impl Strategy> { - option::of( - string_regex("[a-zA-Z]{1,64}:[a-zA-Z]{1,64}(,[a-zA-Z]{1,64}:[a-zA-Z]{1,64}){0,31}") - .unwrap(), - ) - } - - fn metric_tags() -> impl Strategy> { - collection::vec(("[a-z]{1,8}", "[A-Z]{1,8}"), 0..32) - } - - proptest::proptest! { - // For any valid name, tags et al the parse routine is able to parse an - // encoded metric line. - #[test] - #[cfg_attr(miri, ignore)] - fn parse_valid_inputs( - name in metric_name(), - values in metric_values(), - mtype in metric_type(), - tagset in metric_tagset() - ) { - let input = if let Some(ref tagset) = tagset { - format!("{name}:{values}|{mtype}|#{tagset}") - } else { - format!("{name}:{values}|{mtype}") - }; - let metric = parse(&input).unwrap(); - assert_eq!(name, metric.name.as_str()); - - if let Some(tags) = tagset { - let parsed_metric_tags : SortedTags = metric.tags.unwrap(); - assert_eq!(tags.split(',').count(), parsed_metric_tags.values.len()); - tags.split(',').for_each(|kv| { - let (original_key, original_value) = kv.split_once(':').unwrap(); - let mut found = false; - for (k,v) in parsed_metric_tags.values.iter() { - // TODO not sure who to handle duplicate keys. To make the test pass, just find any match instead of first - if *k == Ustr::from(original_key) && *v == Ustr::from(original_value) { - found = true; - } - } - assert!(found); - }); - } else { - assert!(metric.tags.is_none()); - } - - match mtype.as_str() { - "c" => { - if let MetricValue::Count(v) = metric.value { - assert_eq!(v, values.split(':').next().unwrap().parse::().unwrap()); - } else { - panic!("Expected count metric"); - } - } - "g" => { - if let MetricValue::Gauge(v) = metric.value { - assert_eq!(v, values.split(':').next().unwrap().parse::().unwrap()); - } else { - panic!("Expected gauge metric"); - } - } - "d" => { - if let MetricValue::Distribution(d) = metric.value { - assert_eq!(d.min().unwrap(), values.split(':').next().unwrap().parse::().unwrap()); - } else { - panic!("Expected distribution metric"); - } - } - _ => { - panic!("Invalid metric format"); - } - } - } - - #[test] - #[cfg_attr(miri, ignore)] - fn parse_missing_name_and_value( - mtype in metric_type(), - tagset in metric_tagset() - ) { - let input = if let Some(ref tagset) = tagset { - format!("|{mtype}|#{tagset}") - } else { - format!("|{mtype}") - }; - let result = parse(&input); - - assert_eq!(result.unwrap_err(),ParseError::Raw(format!("Invalid metric format {input}"))); - } - - #[test] - #[cfg_attr(miri, ignore)] - fn parse_invalid_name_and_value_format( - name in metric_name(), - values in metric_values(), - mtype in metric_type(), - tagset in metric_tagset() - ) { - // If there is a ':' in the values we cannot distinguish where the - // name and the first value are. - let value = values.split(':').next().unwrap(); - let input = if let Some(ref tagset) = tagset { - format!("{name}{value}|{mtype}|#{tagset}") - } else { - format!("{name}{value}|{mtype}") - }; - let result = parse(&input); - - let verify = result.unwrap_err().to_string(); - assert!(verify.starts_with("parse failure: Invalid metric format ")); - } - - #[test] - #[cfg_attr(miri, ignore)] - fn parse_unsupported_metric_type( - name in metric_name(), - values in metric_values(), - mtype in "[abefijklmnopqrtuvwxyz]", - tagset in metric_tagset() - ) { - let input = if let Some(ref tagset) = tagset { - format!("{name}:{values}|{mtype}|#{tagset}") - } else { - format!("{name}:{values}|{mtype}") - }; - let result = parse(&input); - - assert_eq!( - result.unwrap_err(), - ParseError::Raw(format!("Invalid metric type: {mtype}")) - ); - } - - // The ID of a name, tagset is the same even if the tagset is in a - // distinct order. - // For any valid name, tags et al the parse routine is able to parse an - // encoded metric line. - #[test] - #[cfg_attr(miri, ignore)] - fn id_consistent(name in metric_name(), - mut tags in metric_tags()) { - let mut tagset1 = String::new(); - let mut tagset2 = String::new(); - let now = timestamp_to_bucket(std::time::UNIX_EPOCH.elapsed().expect("unable to poll clock, unrecoverable").as_secs().try_into().unwrap_or_default()); - - for (k,v) in &tags { - tagset1.push_str(k); - tagset1.push(':'); - tagset1.push_str(v); - tagset1.push(','); - } - tags.reverse(); - for (k,v) in &tags { - tagset2.push_str(k); - tagset2.push(':'); - tagset2.push_str(v); - tagset2.push(','); - } - if !tags.is_empty() { - tagset1.pop(); - tagset2.pop(); - } - - let id1 = id(Ustr::from(&name), &Some(SortedTags::parse(&tagset1).unwrap()), now); - let id2 = id(Ustr::from(&name), &Some(SortedTags::parse(&tagset2).unwrap()), now); - - assert_eq!(id1, id2); - } - - #[test] - #[cfg_attr(miri, ignore)] - fn resources_key_val_order(tags in metric_tags()) { - let sorted_tags = SortedTags { values: tags.into_iter() - .map(|(kind, name)| (Ustr::from(&kind), Ustr::from(&name))) - .collect() }; - - let resources = sorted_tags.to_resources(); - - for (i, resource) in resources.iter().enumerate() { - assert_eq!(resource.kind, sorted_tags.values[i].0); - assert_eq!(resource.name, sorted_tags.values[i].1); - } - } - } - - #[test] - #[cfg_attr(miri, ignore)] - fn parse_too_many_tags() { - // 101 - assert_eq!( - parse( - "foo:1|g|#a:1,b:2,c:3,d:4,e:5,f:6,g:7,h:8,i:9,j:10,k:11,l:12,m:13,n:14,o:15,p:16,q:17,r:18,s:19,t:20,u:21,v:22,w:23,x:24,y:25,z:26,aa:27,ab:28,ac:29,ad:30,ae:31,af:32,ag:33,ah:34,ai:35,aj:36,ak:37,al:38,am:39,an:40,ao:41,ap:42,aq:43,ar:44,as:45,at:46,au:47,av:48,aw:49,ax:50,ay:51,az:52,ba:53,bb:54,bc:55,bd:56,be:57,bf:58,bg:59,bh:60,bi:61,bj:62,bk:63,bl:64,bm:65,bn:66,bo:67,bp:68,bq:69,br:70,bs:71,bt:72,bu:73,bv:74,bw:75,bx:76,by:77,bz:78,ca:79,cb:80,cc:81,cd:82,ce:83,cf:84,cg:85,ch:86,ci:87,cj:88,ck:89,cl:90,cm:91,cn:92,co:93,cp:94,cq:95,cr:96,cs:97,ct:98,cu:99,cv:100,cw:101" - ).unwrap_err(), - ParseError::Raw("Too many tags, more than 100".to_string()) - ); - - // 30 - assert!(parse("foo:1|g|#a:1,b:2,c:3,a:1,b:2,c:3,a:1,b:2,c:3,a:1,b:2,c:3,a:1,b:2,c:3,a:1,b:2,c:3,a:1,b:2,c:3,a:1,b:2,c:3,a:1,b:2,c:3,a:1,b:2,c:3").is_ok()); - } - - #[test] - #[cfg_attr(miri, ignore)] - fn invalid_dogstatsd_no_panic() { - assert!(parse("somerandomstring|c+a;slda").is_err()); - } - - #[test] - #[cfg_attr(miri, ignore)] - fn parse_container_id() { - assert!(parse("containerid.metric:0|c|#env:dev,client_transport:udp|c:0000000000000000000000000000000000000000000000000000000000000000").is_ok()); - } - - #[test] - #[cfg_attr(miri, ignore)] - fn parse_tag_no_value() { - let result = parse("datadog.tracer.flush_triggered:1|c|#lang:go,lang_version:go1.22.10,_dd.origin:lambda,runtime-id:d66f501c-d09b-4d0d-970f-515235c4eb56,v1.65.1,service:aws.lambda,reason:scheduled"); - assert!(result.is_ok()); - assert!(result - .unwrap() - .tags - .unwrap() - .values - .iter() - .any(|(k, v)| k == "v1.65.1" && v.is_empty())); - } - - #[test] - #[cfg_attr(miri, ignore)] - fn parse_tag_multi_column() { - let result = parse("datadog.tracer.flush_triggered:1|c|#lang:go:and:something:else"); - assert!(result.is_ok()); - assert_eq!( - result.unwrap().tags.unwrap().values[0], - (Ustr::from("lang"), Ustr::from("go:and:something:else")) - ); - } - - #[test] - #[cfg_attr(miri, ignore)] - fn parse_tracer_metric() { - let input = "datadog.tracer.flush_duration:0.785551|ms|#lang:go,lang_version:go1.23.2,env:redacted_env,_dd.origin:lambda,runtime-id:redacted_runtime,tracer_version:v1.70.1,service:redacted_service,env:redacted_env,service:redacted_service,version:redacted_version"; - let expected_error = "ms".to_string(); - if let ParseError::UnsupportedType(actual_error) = parse(input).unwrap_err() { - assert_eq!(actual_error, expected_error); - } else { - panic!("Expected UnsupportedType error"); - } - } - - #[test] - #[cfg_attr(miri, ignore)] - fn parse_metric_timestamp() { - // Important to test that we round down to the nearest 10 seconds - // for our buckets - let input = "page.views:15|c|#env:dev|T1656581409"; - let metric = parse(input).unwrap(); - assert_eq!(metric.timestamp, 1656581400); - } - - #[test] - #[cfg_attr(miri, ignore)] - fn parse_metric_no_timestamp() { - // *wince* this could be a race condition - // we round the timestamp down to a 10s bucket and I want to test now - // but if the timestamp rolls over to the next bucket time and the test - // is somehow slower than 1s then the test will fail. - // come bug me if I wrecked your CI run - let input = "page.views:15|c|#env:dev"; - let metric = parse(input).unwrap(); - let now: i64 = std::time::UNIX_EPOCH - .elapsed() - .expect("unable to poll clock, unrecoverable") - .as_secs() - .try_into() - .unwrap_or_default(); - assert_eq!(metric.timestamp, (now / 10) * 10); - } - - #[test] - fn sorting_tags() { - let mut tags = SortedTags::parse("z:z0,b:b2,c:c3").unwrap(); - tags.extend(&SortedTags::parse("z1:z11,d:d4,e:e5,f:f6").unwrap()); - tags.extend(&SortedTags::parse("a:a1").unwrap()); - assert_eq!(tags.values.len(), 8); - let first_element = tags.values.first().unwrap(); - assert_eq!(first_element.0, Ustr::from("a")); - assert_eq!(first_element.1, Ustr::from("a1")); - } -} diff --git a/dogstatsd/tests/integration_test.rs b/dogstatsd/tests/integration_test.rs deleted file mode 100644 index 641c3ebd48..0000000000 --- a/dogstatsd/tests/integration_test.rs +++ /dev/null @@ -1,268 +0,0 @@ -// Copyright 2023-Present Datadog, Inc. https://www.datadoghq.com/ -// SPDX-License-Identifier: Apache-2.0 - -use dogstatsd::metric::SortedTags; -use dogstatsd::{ - aggregator::Aggregator as MetricsAggregator, - constants::CONTEXTS, - datadog::{DdDdUrl, MetricsIntakeUrlPrefix, MetricsIntakeUrlPrefixOverride}, - dogstatsd::{DogStatsD, DogStatsDConfig}, - flusher::{Flusher, FlusherConfig}, -}; -use mockito::Server; -use std::sync::{Arc, Mutex}; -use tokio::{ - net::UdpSocket, - time::{sleep, timeout, Duration}, -}; -use tokio_util::sync::CancellationToken; - -#[cfg(test)] -#[cfg(not(miri))] -#[tokio::test] -async fn dogstatsd_server_ships_series() { - use dogstatsd::datadog::RetryStrategy; - - let mut mock_server = Server::new_async().await; - - let mock = mock_server - .mock("POST", "/api/v2/series") - .match_header("DD-API-KEY", "mock-api-key") - .match_header("Content-Type", "application/json") - .with_status(202) - .create_async() - .await; - - let metrics_aggr = Arc::new(Mutex::new( - MetricsAggregator::new(SortedTags::parse("sometkey:somevalue").unwrap(), CONTEXTS) - .expect("failed to create aggregator"), - )); - - let _ = start_dogstatsd(&metrics_aggr).await; - - let mut metrics_flusher = Flusher::new(FlusherConfig { - api_key: "mock-api-key".to_string(), - aggregator: Arc::clone(&metrics_aggr), - metrics_intake_url_prefix: MetricsIntakeUrlPrefix::new( - None, - MetricsIntakeUrlPrefixOverride::maybe_new( - None, - Some(DdDdUrl::new(mock_server.url()).expect("failed to create URL")), - ), - ) - .expect("failed to create URL"), - https_proxy: None, - timeout: std::time::Duration::from_secs(5), - retry_strategy: RetryStrategy::Immediate(3), - }); - - let server_address = "127.0.0.1:18125"; - let socket = UdpSocket::bind("0.0.0.0:0") - .await - .expect("unable to bind UDP socket"); - let metric = "custom_metric:1|g"; - - socket - .send_to(metric.as_bytes(), &server_address) - .await - .expect("unable to send metric"); - - let flush = async { - while !mock.matched() { - sleep(Duration::from_millis(100)).await; - metrics_flusher.flush().await; - } - }; - - let result = timeout(Duration::from_millis(1000), flush).await; - - match result { - Ok(_) => mock.assert(), - Err(_) => panic!("timed out before server received metric flush"), - } -} - -async fn start_dogstatsd(metrics_aggr: &Arc>) -> CancellationToken { - let dogstatsd_config = DogStatsDConfig { - host: "127.0.0.1".to_string(), - port: 18125, - }; - let dogstatsd_cancel_token = tokio_util::sync::CancellationToken::new(); - let dogstatsd_client = DogStatsD::new( - &dogstatsd_config, - Arc::clone(metrics_aggr), - dogstatsd_cancel_token.clone(), - ) - .await; - - tokio::spawn(async move { - dogstatsd_client.spin().await; - }); - - dogstatsd_cancel_token -} - -#[cfg(test)] -#[cfg(not(miri))] -#[tokio::test] -async fn test_send_with_retry_immediate_failure() { - use dogstatsd::datadog::{DdApi, DdDdUrl, RetryStrategy}; - use dogstatsd::metric::{parse, SortedTags}; - - let mut server = Server::new_async().await; - let mock = server - .mock("POST", "/api/v2/series") - .with_status(500) - .with_body("Internal Server Error") - .expect(3) - .create_async() - .await; - - let retry_strategy = RetryStrategy::Immediate(3); - let dd_api = DdApi::new( - "test_key".to_string(), - MetricsIntakeUrlPrefix::new( - None, - MetricsIntakeUrlPrefixOverride::maybe_new( - None, - Some(DdDdUrl::new(server.url()).expect("failed to create URL")), - ), - ) - .expect("failed to create URL"), - None, - Duration::from_secs(1), - retry_strategy.clone(), - ); - - // Create a series using the Aggregator - let mut aggregator = MetricsAggregator::new(SortedTags::parse("test:value").unwrap(), 1) - .expect("failed to create aggregator"); - let metric = parse("test:1|c").expect("failed to parse metric"); - aggregator.insert(metric).expect("failed to insert metric"); - let series = aggregator.to_series(); - - let result = dd_api.ship_series(&series).await; - - // The result should be an error since we got a 500 response - assert!(result.is_err()); - - // Verify that the mock was called exactly 3 times - mock.assert_async().await; -} - -#[cfg(test)] -#[cfg(not(miri))] -#[tokio::test] -async fn test_send_with_retry_linear_backoff_success() { - use dogstatsd::datadog::{DdApi, DdDdUrl, RetryStrategy}; - use dogstatsd::metric::{parse, SortedTags}; - - let mut server = Server::new_async().await; - let mock = server - .mock("POST", "/api/v2/series") - .with_status(500) - .with_body("Internal Server Error") - .expect(1) - .create_async() - .await; - - let success_mock = server - .mock("POST", "/api/v2/series") - .with_status(200) - .with_body("Success") - .expect(1) - .create_async() - .await; - - let retry_strategy = RetryStrategy::LinearBackoff(3, 1); // 3 attempts, 1ms delay - let dd_api = DdApi::new( - "test_key".to_string(), - MetricsIntakeUrlPrefix::new( - None, - MetricsIntakeUrlPrefixOverride::maybe_new( - None, - Some(DdDdUrl::new(server.url()).expect("failed to create URL")), - ), - ) - .expect("failed to create URL"), - None, - Duration::from_secs(1), - retry_strategy.clone(), - ); - - // Create a series using the Aggregator - let mut aggregator = MetricsAggregator::new(SortedTags::parse("test:value").unwrap(), 1) - .expect("failed to create aggregator"); - let metric = parse("test:1|c").expect("failed to parse metric"); - aggregator.insert(metric).expect("failed to insert metric"); - let series = aggregator.to_series(); - - let result = dd_api.ship_series(&series).await; - - // The result should be Ok since we got a 200 response on retry - assert!(result.is_ok()); - if let Ok(response) = result { - assert_eq!(response.status(), reqwest::StatusCode::OK); - } else { - panic!("Expected Ok result"); - } - - // Verify that both mocks were called exactly once - mock.assert_async().await; - success_mock.assert_async().await; -} - -#[cfg(test)] -#[cfg(not(miri))] -#[tokio::test] -async fn test_send_with_retry_immediate_failure_after_one_attempt() { - use dogstatsd::datadog::{DdApi, DdDdUrl, RetryStrategy}; - use dogstatsd::flusher::ShippingError; - use dogstatsd::metric::{parse, SortedTags}; - - let mut server = Server::new_async().await; - let mock = server - .mock("POST", "/api/v2/series") - .with_status(500) - .with_body("Internal Server Error") - .expect(1) - .create_async() - .await; - - let retry_strategy = RetryStrategy::Immediate(1); // Only 1 attempt - let dd_api = DdApi::new( - "test_key".to_string(), - MetricsIntakeUrlPrefix::new( - None, - MetricsIntakeUrlPrefixOverride::maybe_new( - None, - Some(DdDdUrl::new(server.url()).expect("failed to create URL")), - ), - ) - .expect("failed to create URL"), - None, - Duration::from_secs(1), - retry_strategy.clone(), - ); - - // Create a series using the Aggregator - let mut aggregator = MetricsAggregator::new(SortedTags::parse("test:value").unwrap(), 1) - .expect("failed to create aggregator"); - let metric = parse("test:1|c").expect("failed to parse metric"); - aggregator.insert(metric).expect("failed to insert metric"); - let series = aggregator.to_series(); - - let result = dd_api.ship_series(&series).await; - - // The result should be an error since we got a 500 response - assert!(result.is_err()); - if let Err(ShippingError::Destination(Some(status), msg)) = result { - assert_eq!(status, reqwest::StatusCode::INTERNAL_SERVER_ERROR); - assert_eq!(msg, "Failed to send request after 1 attempts"); - } else { - panic!("Expected ShippingError::Destination with status 500"); - } - - // Verify that the mock was called exactly once - mock.assert_async().await; -} diff --git a/serverless/Cargo.toml b/serverless/Cargo.toml index fd51966530..d4e68922eb 100644 --- a/serverless/Cargo.toml +++ b/serverless/Cargo.toml @@ -9,7 +9,7 @@ env_logger = "0.10.0" datadog-trace-mini-agent = { path = "../trace-mini-agent" } datadog-trace-protobuf = { path = "../trace-protobuf" } datadog-trace-utils = { path = "../trace-utils" } -dogstatsd = { path = "../dogstatsd" } +dogstatsd = { git = "https://github.com/DataDog/serverless-components/", rev = "4dfe72ab1850680f41dd79d30a937eb68e7ba6da", default-features = false } tokio = { version = "1", features = ["macros", "rt-multi-thread"]} tokio-util = { version = "0.7", default-features = false } tracing = { version = "0.1", default-features = false } diff --git a/tools/docker/Dockerfile.build b/tools/docker/Dockerfile.build index 246c61c168..17d9b6a219 100644 --- a/tools/docker/Dockerfile.build +++ b/tools/docker/Dockerfile.build @@ -77,7 +77,6 @@ COPY "ddcommon-ffi/Cargo.toml" "ddcommon-ffi/" COPY "ddtelemetry/Cargo.toml" "ddtelemetry/" COPY "ddtelemetry-ffi/Cargo.toml" "ddtelemetry-ffi/" COPY "ddsketch/Cargo.toml" "ddsketch/" -COPY "dogstatsd/Cargo.toml" "dogstatsd/" COPY "dogstatsd-client/Cargo.toml" "dogstatsd-client/" COPY "dynamic-configuration/Cargo.toml" "dynamic-configuration/" COPY "library-config-ffi/Cargo.toml" "library-config-ffi/"