diff --git a/Cargo.lock b/Cargo.lock index e207d60a00ce8..d6072dad0a125 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7043,6 +7043,55 @@ dependencies = [ "vcpkg", ] +[[package]] +name = "opentelemetry" +version = "0.28.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "236e667b670a5cdf90c258f5a55794ec5ac5027e960c224bff8367a59e1e6426" +dependencies = [ + "futures-core", + "futures-sink", + "js-sys", + "pin-project-lite", + "thiserror 2.0.3", + "tracing 0.1.41", +] + +[[package]] +name = "opentelemetry-http" +version = "0.28.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a8863faf2910030d139fb48715ad5ff2f35029fc5f244f6d5f689ddcf4d26253" +dependencies = [ + "async-trait", + "bytes 1.10.1", + "http 1.1.0", + "opentelemetry", + "reqwest 0.12.9", + "tracing 0.1.41", +] + +[[package]] +name = "opentelemetry-otlp" +version = "0.28.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5bef114c6d41bea83d6dc60eb41720eedd0261a67af57b66dd2b84ac46c01d91" +dependencies = [ + "async-trait", + "futures-core", + "http 1.1.0", + "opentelemetry", + "opentelemetry-http", + "opentelemetry-proto 0.28.0", + "opentelemetry_sdk", + "prost 0.13.3", + "reqwest 0.12.9", + "thiserror 2.0.3", + "tokio", + "tonic 0.12.3", + "tracing 0.1.41", +] + [[package]] name = "opentelemetry-proto" version = "0.1.0" @@ -7060,6 +7109,39 @@ dependencies = [ "vrl", ] +[[package]] +name = "opentelemetry-proto" +version = "0.28.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "56f8870d3024727e99212eb3bb1762ec16e255e3e6f58eeb3dc8db1aa226746d" +dependencies = [ + "opentelemetry", + "opentelemetry_sdk", + "prost 0.13.3", + "tonic 0.12.3", +] + +[[package]] +name = "opentelemetry_sdk" +version = "0.28.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "84dfad6042089c7fc1f6118b7040dc2eb4ab520abbf410b79dc481032af39570" +dependencies = [ + "async-trait", + "futures-channel", + "futures-executor", + "futures-util", + "glob", + "opentelemetry", + "percent-encoding", + "rand 0.8.5", + "serde_json", + "thiserror 2.0.3", + "tokio", + "tokio-stream", + "tracing 0.1.41", +] + [[package]] name = "option-ext" version = "0.2.0" @@ -8627,6 +8709,7 @@ checksum = "a77c62af46e79de0a562e1a9849205ffcb7fc1238876e9bd743357570e04046f" dependencies = [ "base64 0.22.1", "bytes 1.10.1", + "futures-channel", "futures-core", "futures-util", "http 1.1.0", @@ -11606,6 +11689,9 @@ dependencies = [ "openssl", "openssl-probe", "openssl-src", + "opentelemetry", + "opentelemetry-otlp", + "opentelemetry_sdk", "ordered-float 4.6.0", "paste", "percent-encoding", @@ -11915,7 +12001,7 @@ dependencies = [ "codecs", "enrichment", "file-source", - "opentelemetry-proto", + "opentelemetry-proto 0.1.0", "prometheus-parser", "vector-api-client", "vector-buffers", diff --git a/Cargo.toml b/Cargo.toml index fdbcfb5e25eed..1b29e2f4b6f59 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -302,6 +302,9 @@ number_prefix = { version = "0.4.0", default-features = false, features = ["std" ratatui = { version = "0.29.0", optional = true, default-features = false, features = ["crossterm"] } # Opentelemetry +opentelemetry = { version = "0.28.0", optional = true, features = ["metrics"] } +opentelemetry_sdk = { version = "0.28.0", optional = true, features = ["metrics", "experimental_async_runtime", "rt-tokio"] } +opentelemetry-otlp = { version = "0.28.0", optional = true, features = ["metrics", "http-proto", "grpc-tonic"] } hex = { version = "0.4.3", default-features = false, optional = true } @@ -775,6 +778,7 @@ sinks-metrics = [ "sinks-humio", "sinks-influxdb", "sinks-kafka", + "sinks-opentelemetry", "sinks-prometheus", "sinks-sematext", "sinks-statsd", @@ -821,7 +825,7 @@ sinks-mqtt = ["dep:rumqttc"] sinks-nats = ["dep:async-nats", "dep:nkeys"] sinks-new_relic_logs = ["sinks-http"] sinks-new_relic = [] -sinks-opentelemetry = ["sinks-http"] +sinks-opentelemetry = ["sinks-http", "vector-lib/opentelemetry", "dep:opentelemetry", "dep:opentelemetry_sdk", "dep:opentelemetry-otlp"] sinks-papertrail = ["dep:syslog"] sinks-prometheus = ["dep:base64", "dep:prost", "vector-lib/prometheus"] sinks-postgres = ["dep:sqlx"] @@ -1015,6 +1019,7 @@ loki-benches = ["sinks-loki"] enrichment-tables-benches = ["enrichment-tables-geoip", "enrichment-tables-mmdb", "enrichment-tables-memory"] proptest = ["dep:proptest", "dep:proptest-derive", "vrl/proptest"] + [[bench]] name = "default" harness = false diff --git a/LICENSE-3rdparty.csv b/LICENSE-3rdparty.csv index 7f7cb766ac1b3..df5c7906c3d28 100644 --- a/LICENSE-3rdparty.csv +++ b/LICENSE-3rdparty.csv @@ -441,6 +441,11 @@ openssl,https://github.com/sfackler/rust-openssl,Apache-2.0,Steven Fackler openssl-sys,https://github.com/sfackler/rust-openssl,MIT,"Alex Crichton , Steven Fackler " +opentelemetry,https://github.com/open-telemetry/opentelemetry-rust,Apache-2.0,The opentelemetry Authors +opentelemetry-http,https://github.com/open-telemetry/opentelemetry-rust,Apache-2.0,The opentelemetry-http Authors +opentelemetry-otlp,https://github.com/open-telemetry/opentelemetry-rust/tree/main/opentelemetry-otlp,Apache-2.0,The opentelemetry-otlp Authors +opentelemetry-proto,https://github.com/open-telemetry/opentelemetry-rust/tree/main/opentelemetry-proto,Apache-2.0,The opentelemetry-proto Authors +opentelemetry_sdk,https://github.com/open-telemetry/opentelemetry-rust,Apache-2.0,The opentelemetry_sdk Authors ordered-float,https://github.com/reem/rust-ordered-float,MIT,"Jonathan Reem , Matt Brubeck " outref,https://github.com/Nugine/outref,MIT,The outref Authors overload,https://github.com/danaugrs/overload,MIT,Daniel Salvadori diff --git a/changelog.d/opentelemetry_metrics_sink.feature.md b/changelog.d/opentelemetry_metrics_sink.feature.md new file mode 100644 index 0000000000000..c73e49fa943e6 --- /dev/null +++ b/changelog.d/opentelemetry_metrics_sink.feature.md @@ -0,0 +1,3 @@ +Added support for sendings metrics via the OpenTelemetry sink to OpenTelemetry collectors + +authors: brittonhayes diff --git a/lib/opentelemetry-proto/build.rs b/lib/opentelemetry-proto/build.rs index 4e642ce8ab283..edc41ca0417a9 100644 --- a/lib/opentelemetry-proto/build.rs +++ b/lib/opentelemetry-proto/build.rs @@ -9,10 +9,10 @@ fn main() -> Result<(), Error> { "src/proto/opentelemetry-proto/opentelemetry/proto/common/v1/common.proto", "src/proto/opentelemetry-proto/opentelemetry/proto/resource/v1/resource.proto", "src/proto/opentelemetry-proto/opentelemetry/proto/logs/v1/logs.proto", - "src/proto/opentelemetry-proto/opentelemetry/proto/metrics/v1/metrics.proto", "src/proto/opentelemetry-proto/opentelemetry/proto/trace/v1/trace.proto", "src/proto/opentelemetry-proto/opentelemetry/proto/collector/trace/v1/trace_service.proto", "src/proto/opentelemetry-proto/opentelemetry/proto/collector/logs/v1/logs_service.proto", + "src/proto/opentelemetry-proto/opentelemetry/proto/metrics/v1/metrics.proto", "src/proto/opentelemetry-proto/opentelemetry/proto/collector/metrics/v1/metrics_service.proto", ], &["src/proto/opentelemetry-proto"], diff --git a/src/sinks/opentelemetry/metrics.rs b/src/sinks/opentelemetry/metrics.rs new file mode 100644 index 0000000000000..d7a7f23124241 --- /dev/null +++ b/src/sinks/opentelemetry/metrics.rs @@ -0,0 +1,201 @@ +use crate::event::metric::{Metric as VectorMetric, MetricValue}; +use std::task::{Context, Poll}; +use vector_config::configurable_component; + +use futures::future::{self, BoxFuture}; +use http::StatusCode; +use hyper::Body; +use tower::Service; +use tracing::{debug, trace}; +use vector_lib::event::EventStatus; + +use opentelemetry::metrics::{Meter, MeterProvider}; +use opentelemetry::KeyValue; +use opentelemetry_otlp::{MetricExporter, WithExportConfig}; +use opentelemetry_sdk::metrics::{SdkMeterProvider, Temporality}; + +use crate::event::Event; +use crate::sinks::util::PartitionInnerBuffer; +use futures_util::stream::BoxStream; +use vector_lib::sink::StreamSink; + +/// The aggregation temporality to use for metrics. +#[configurable_component] +#[derive(Clone, Copy, Debug)] +#[serde(rename_all = "snake_case")] +pub enum AggregationTemporality { + /// Delta temporality means that metrics are reported as changes since the last report. + Delta, + /// Cumulative temporality means that metrics are reported as cumulative changes since a fixed start time. + Cumulative, +} + +impl Default for AggregationTemporality { + fn default() -> Self { + Self::Cumulative + } +} + +// Add conversion from AggregationTemporality to the OpenTelemetry SDK's Temporality +impl From for Temporality { + fn from(temporality: AggregationTemporality) -> Self { + match temporality { + AggregationTemporality::Delta => Temporality::Delta, + AggregationTemporality::Cumulative => Temporality::Cumulative, + } + } +} + +#[derive(Default)] +pub struct OpentelemetryMetricNormalize; + +// Implementation using the OpenTelemetry SDK +pub struct OpentelemetryMetricsSvc { + meter_provider: SdkMeterProvider, + meter: Meter, + namespace: String, +} + +impl OpentelemetryMetricsSvc { + pub fn new( + namespace: String, + endpoint: String, + temporality: AggregationTemporality, + ) -> crate::Result { + // Create the exporter + let exporter = MetricExporter::builder() + .with_http() + .with_endpoint(endpoint) + .with_temporality(Temporality::from(temporality)) + .build() + .map_err(|e| crate::Error::from(format!("Failed to build metrics exporter: {}", e)))?; + + // Create the meter provider with the exporter + let provider = SdkMeterProvider::builder() + .with_periodic_exporter(exporter) + .build(); + + let meter = provider.meter("vector"); + + Ok(Self { + meter_provider: provider, + meter, + namespace, + }) + } + + // Convert and record Vector metrics using the OpenTelemetry SDK + fn convert_and_record_metrics(&self, events: Vec) { + for event in events { + let metric_name = event.name().to_string(); + let attributes = event + .tags() + .map(|tags| { + tags.iter_single() + .map(|(k, v)| KeyValue::new(k.to_string(), v.to_string())) + .collect::>() + }) + .unwrap_or_default(); + + // Add the service.name attribute with the namespace + let mut all_attributes = vec![KeyValue::new("service.name", self.namespace.clone())]; + all_attributes.extend(attributes); + + match event.value() { + MetricValue::Counter { value } => { + let counter = self.meter.f64_counter(metric_name).build(); + counter.add(*value, &all_attributes); + } + MetricValue::Gauge { value } => { + // For gauges, we use a counter since observable gauges require callbacks + let counter = self + .meter + .f64_counter(format!("{}_gauge", metric_name)) + .build(); + counter.add(*value, &all_attributes); + } + MetricValue::Distribution { samples, .. } => { + let histogram = self.meter.f64_histogram(metric_name).build(); + for sample in samples { + // Record each sample with its rate + for _ in 0..sample.rate { + histogram.record(sample.value, &all_attributes); + } + } + } + MetricValue::Set { values } => { + // For sets, we record the count of unique values + let counter = self + .meter + .f64_counter(format!("{}_set", metric_name)) + .build(); + counter.add(values.len() as f64, &all_attributes); + } + _ => {} + } + } + } +} + +impl Service, String>> for OpentelemetryMetricsSvc { + type Response = http::Response; + type Error = crate::Error; + type Future = BoxFuture<'static, Result>; + + fn poll_ready(&mut self, _cx: &mut Context) -> Poll> { + Poll::Ready(Ok(())) + } + + fn call(&mut self, items: PartitionInnerBuffer, String>) -> Self::Future { + let (metrics, _namespace) = items.into_parts(); + + // Convert and record metrics + self.convert_and_record_metrics(metrics); + + // The SDK handles the export asynchronously, so we just return a success response + Box::pin(future::ok( + http::Response::builder() + .status(StatusCode::OK) + .body(Body::empty()) + .unwrap(), + )) + } +} + +impl Drop for OpentelemetryMetricsSvc { + fn drop(&mut self) { + // Ensure metrics are exported before shutting down + if let Err(err) = self.meter_provider.shutdown() { + error!("Error shutting down meter provider: {:?}", err); + } + } +} + +#[async_trait::async_trait] +impl StreamSink for OpentelemetryMetricsSvc { + async fn run(mut self: Box, mut input: BoxStream<'_, Event>) -> Result<(), ()> { + use futures::StreamExt; + + debug!("OpenTelemetry metrics sink started"); + + while let Some(mut event) = input.next().await { + // Extract finalizers before processing + let finalizers = event.metadata_mut().take_finalizers(); + + // Extract metrics from the event + if let Event::Metric(metric) = event { + trace!("Processing metric event: {}", metric.name()); + // Process the metric + self.convert_and_record_metrics(vec![metric]); + } else { + trace!("Ignoring non-metric event"); + } + + // Finalize the event with success status + finalizers.update_status(EventStatus::Delivered); + } + + debug!("OpenTelemetry metrics sink stopped"); + Ok(()) + } +} diff --git a/src/sinks/opentelemetry/mod.rs b/src/sinks/opentelemetry/mod.rs index 90d26da27e202..fe895e9051baa 100644 --- a/src/sinks/opentelemetry/mod.rs +++ b/src/sinks/opentelemetry/mod.rs @@ -1,20 +1,71 @@ -use crate::codecs::{EncodingConfigWithFraming, Transformer}; -use crate::config::{AcknowledgementsConfig, Input, SinkConfig, SinkContext}; -use crate::sinks::http::config::{HttpMethod, HttpSinkConfig}; -use crate::sinks::{Healthcheck, VectorSink}; +use tracing::debug; + +use crate::{ + codecs::{EncodingConfigWithFraming, Transformer}, + config::{AcknowledgementsConfig, SinkConfig, SinkContext}, + sinks::{http::config::HttpSinkConfig, Healthcheck, VectorSink}, +}; use indoc::indoc; use vector_config::component::GenerateConfig; -use vector_lib::codecs::encoding::{FramingConfig, SerializerConfig}; use vector_lib::codecs::JsonSerializerConfig; use vector_lib::configurable::configurable_component; +use vector_lib::{ + codecs::encoding::{FramingConfig, SerializerConfig}, + config::Input, +}; + +mod metrics; +use metrics::AggregationTemporality; + +use super::http::config::HttpMethod; /// Configuration for the `OpenTelemetry` sink. -#[configurable_component(sink("opentelemetry", "Deliver OTLP data over HTTP."))] -#[derive(Clone, Debug, Default)] +#[configurable_component(sink( + "opentelemetry", + "Deliver OTLP data (logs, metrics, traces) over HTTP." +))] +#[derive(Clone, Debug)] pub struct OpenTelemetryConfig { - /// Protocol configuration + /// Protocol configuration. #[configurable(derived)] protocol: Protocol, + + /// The endpoint to send OpenTelemetry logs, metrics, and traces to. + /// + /// This should be a full URL, including the protocol (e.g. `https://`). + /// If not specified, telemetry will not be sent. + #[configurable(metadata(docs::examples = "http://localhost:4317/v1/metrics"))] + pub endpoint: Option, + + /// The endpoint to send healthcheck requests to. + /// + /// This should be a full URL, including the protocol (e.g. `https://`). + #[configurable(metadata(docs::examples = "http://localhost:13133"))] + pub healthcheck_endpoint: Option, + + /// The default namespace to use for metrics that do not have one. + /// + /// Metrics with the same name can only be differentiated by their namespace. + #[configurable(metadata(docs::examples = "myservice"))] + pub default_namespace: Option, + + /// The aggregation temporality to use for metrics. + /// + /// This determines how metrics are aggregated over time. + #[serde(default)] + pub aggregation_temporality: AggregationTemporality, +} + +impl Default for OpenTelemetryConfig { + fn default() -> Self { + Self { + protocol: Protocol::default(), + endpoint: None, + healthcheck_endpoint: None, + default_namespace: Some("vector".to_string()), + aggregation_temporality: AggregationTemporality::Cumulative, + } + } } /// The protocol used to send data to OpenTelemetry. @@ -59,6 +110,11 @@ impl GenerateConfig for OpenTelemetryConfig { type = "http" uri = "http://localhost:5318/v1/logs" encoding.codec = "json" + + metrics_endpoint = "http://localhost:4317/v1/metrics" + healthcheck_endpoint = "http://localhost:13133" + default_namespace = "vector" + aggregation_temporality = "cumulative" "#}) .unwrap() } @@ -68,9 +124,64 @@ impl GenerateConfig for OpenTelemetryConfig { #[typetag::serde(name = "opentelemetry")] impl SinkConfig for OpenTelemetryConfig { async fn build(&self, cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> { - match &self.protocol { - Protocol::Http(config) => config.build(cx).await, - } + use crate::sinks::util::UriSerde; + use futures::FutureExt; + use std::str::FromStr; + // use vector_lib::sink::VectorSink; + + // Build the logs/traces sink + let (logs_sink, logs_healthcheck) = match &self.protocol { + Protocol::Http(config) => config.build(cx.clone()).await?, + }; + + // Build the metrics sink if an endpoint is provided + let metrics_sink = if let Some(endpoint) = &self.endpoint { + use crate::sinks::opentelemetry::metrics::OpentelemetryMetricsSvc; + debug!( + "Creating OpenTelemetry metrics sink with endpoint: {}", + endpoint + ); + let namespace = self + .default_namespace + .clone() + .unwrap_or_else(|| "vector".to_string()); + + let metrics_service = OpentelemetryMetricsSvc::new( + namespace, + endpoint.clone(), + self.aggregation_temporality, + )?; + let metrics_sink = VectorSink::from_event_streamsink(metrics_service); + Some(metrics_sink) + } else { + None + }; + + // Determine the healthcheck endpoint + let healthcheck_endpoint = self.healthcheck_endpoint.clone().or_else(|| { + let Protocol::Http(config) = &self.protocol; + Some(config.uri.to_string()) + }); + + // Create a healthcheck if an endpoint is provided + let healthcheck = if let Some(endpoint) = healthcheck_endpoint { + let client = crate::http::HttpClient::new(None, cx.proxy())?; + let uri = UriSerde::from_str(&endpoint) + .map_err(|e| crate::Error::from(format!("Invalid healthcheck endpoint: {}", e)))?; + + healthcheck(uri.to_string(), client).boxed() + } else { + logs_healthcheck + }; + + // Create the final sink + let sink = if let Some(metrics_sink) = metrics_sink { + metrics_sink + } else { + logs_sink + }; + + Ok((sink, healthcheck)) } fn input(&self) -> Input { @@ -80,16 +191,88 @@ impl SinkConfig for OpenTelemetryConfig { } fn acknowledgements(&self) -> &AcknowledgementsConfig { - match self.protocol { - Protocol::Http(ref config) => config.acknowledgements(), + match &self.protocol { + Protocol::Http(config) => config.acknowledgements(), } } } +/// Healthcheck for the OpenTelemetry sink. +/// +/// Reference https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/extension/healthcheckextension/README.md +async fn healthcheck(endpoint: String, client: crate::http::HttpClient) -> crate::Result<()> { + use crate::sinks::HealthcheckError; + use http::{Request, StatusCode}; + use hyper::Body; + + let request = Request::head(&endpoint) + .body(Body::empty()) + .map_err(|e| crate::Error::from(format!("Error building request: {}", e)))?; + + let response = client.send(request).await?; + + match response.status() { + StatusCode::OK | StatusCode::ACCEPTED | StatusCode::NO_CONTENT => Ok(()), + status => Err(HealthcheckError::UnexpectedStatus { status }.into()), + } +} + #[cfg(test)] mod test { + use super::*; + use indoc::indoc; + #[test] fn generate_config() { crate::test_util::test_generate_config::(); } + + #[test] + fn config_opentelemetry_default() { + let config = indoc! {r#" + [protocol] + type = "http" + uri = "http://localhost:5318/v1/logs" + encoding.codec = "json" + "#}; + let config: OpenTelemetryConfig = toml::from_str(config).unwrap(); + + assert!(config.endpoint.is_none()); + assert!(config.healthcheck_endpoint.is_none()); + assert_eq!(config.default_namespace, Some("vector".to_string())); + assert!(matches!( + config.aggregation_temporality, + AggregationTemporality::Cumulative + )); + } + + #[test] + fn config_opentelemetry_with_metrics() { + let config = indoc! {r#" + [protocol] + type = "http" + uri = "http://localhost:5318/v1/logs" + encoding.codec = "json" + + endpoint = "http://localhost:4317/v1/metrics" + healthcheck_endpoint = "http://localhost:13133" + default_namespace = "myservice" + aggregation_temporality = "delta" + "#}; + let config: OpenTelemetryConfig = toml::from_str(config).unwrap(); + + assert_eq!( + config.endpoint, + Some("http://localhost:4317/v1/metrics".to_string()) + ); + assert_eq!( + config.healthcheck_endpoint, + Some("http://localhost:13133".to_string()) + ); + assert_eq!(config.default_namespace, Some("myservice".to_string())); + assert!(matches!( + config.aggregation_temporality, + AggregationTemporality::Delta + )); + } } diff --git a/website/cue/reference/components/sinks/base/opentelemetry.cue b/website/cue/reference/components/sinks/base/opentelemetry.cue index de87314957157..bdc1fe699ba7c 100644 --- a/website/cue/reference/components/sinks/base/opentelemetry.cue +++ b/website/cue/reference/components/sinks/base/opentelemetry.cue @@ -1,7 +1,7 @@ package metadata base: components: sinks: opentelemetry: configuration: protocol: { - description: "Protocol configuration" + description: "Protocol configuration for logs and metrics." required: true type: object: options: { acknowledgements: { @@ -790,7 +790,7 @@ base: components: sinks: opentelemetry: configuration: protocol: { Valid values are greater than or equal to `0`, and we expect reasonable values to range from `1.0` to `3.0`. - When calculating the past RTT average, we also compute a secondary “deviation” value that indicates how variable + When calculating the past RTT average, we also compute a secondary "deviation" value that indicates how variable those values are. We use that deviation when comparing the past RTT average to the current measurements, so we can ignore increases in RTT that are within an expected range. This factor is used to scale up the deviation to an appropriate range. Larger values cause the algorithm to ignore larger increases in the RTT. @@ -1030,4 +1030,91 @@ base: components: sinks: opentelemetry: configuration: protocol: { type: string: examples: ["https://10.22.212.22:9000/endpoint"] } } + + endpoint: { + description: """ + The endpoint to send OpenTelemetry events to. + + This should be a full URL, including the protocol (e.g. `https://`). + If not specified, events will not be sent. + """ + required: false + type: string: { + examples: ["http://localhost:4317/v1/metrics", "http://localhost:4318/v1/logs"] + } + } + + healthcheck_endpoint: { + description: """ + The endpoint to send healthcheck requests to. + + This should be a full URL, including the protocol (e.g. `https://`). + """ + required: false + type: string: { + examples: ["http://localhost:13133"] + } + } + + default_namespace: { + description: """ + The default namespace to use for events that do not have one. + + Metrics with the same name can only be differentiated by their namespace. + """ + required: false + type: string: { + default: "vector" + examples: ["myservice"] + } + } + + aggregation_temporality: { + description: """ + The aggregation temporality to use for metrics. + + This determines how metrics are aggregated over time. + """ + required: false + type: string: { + default: "cumulative" + enum: { + delta: "Delta temporality means that metrics are reported as changes since the last report." + cumulative: "Cumulative temporality means that metrics are reported as cumulative changes since a fixed start time." + } + } + } + + input: { + logs: true + metrics: true + traces: false + } + + how_it_works: { + opentelemetry_protocol: { + title: "OpenTelemetry Protocol" + body: """ + The [OpenTelemetry Protocol][otlp] (OTLP) is a vendor-agnostic way to export telemetry data. + This sink supports sending logs, metrics, and traces to any OTLP-compatible backend. + + [otlp]: https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/protocol/otlp.md + """ + } + + metrics_export: { + title: "Metrics Export" + body: """ + When the `metrics_endpoint` is configured, Vector will export metrics to the specified endpoint. + + Vector uses the OpenTelemetry SDK to export metrics, which provides compatibility with the OpenTelemetry protocol + and handles details like batching and retries. + + The `aggregation_temporality` setting controls how metrics are aggregated over time: + + - **Delta**: Metrics are reported as changes since the last report. + - **Cumulative**: Metrics are reported as cumulative changes since a fixed start time. + """ + } + } }