Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
88 changes: 87 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 6 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,9 @@ number_prefix = { version = "0.4.0", default-features = false, features = ["std"
ratatui = { version = "0.29.0", optional = true, default-features = false, features = ["crossterm"] }

# Opentelemetry
opentelemetry = { version = "0.28.0", optional = true, features = ["metrics"] }
opentelemetry_sdk = { version = "0.28.0", optional = true, features = ["metrics", "experimental_async_runtime", "rt-tokio"] }
opentelemetry-otlp = { version = "0.28.0", optional = true, features = ["metrics", "http-proto", "grpc-tonic"] }

hex = { version = "0.4.3", default-features = false, optional = true }

Expand Down Expand Up @@ -775,6 +778,7 @@ sinks-metrics = [
"sinks-humio",
"sinks-influxdb",
"sinks-kafka",
"sinks-opentelemetry",
"sinks-prometheus",
"sinks-sematext",
"sinks-statsd",
Expand Down Expand Up @@ -821,7 +825,7 @@ sinks-mqtt = ["dep:rumqttc"]
sinks-nats = ["dep:async-nats", "dep:nkeys"]
sinks-new_relic_logs = ["sinks-http"]
sinks-new_relic = []
sinks-opentelemetry = ["sinks-http"]
sinks-opentelemetry = ["sinks-http", "vector-lib/opentelemetry", "dep:opentelemetry", "dep:opentelemetry_sdk", "dep:opentelemetry-otlp"]
sinks-papertrail = ["dep:syslog"]
sinks-prometheus = ["dep:base64", "dep:prost", "vector-lib/prometheus"]
sinks-postgres = ["dep:sqlx"]
Expand Down Expand Up @@ -1015,6 +1019,7 @@ loki-benches = ["sinks-loki"]
enrichment-tables-benches = ["enrichment-tables-geoip", "enrichment-tables-mmdb", "enrichment-tables-memory"]
proptest = ["dep:proptest", "dep:proptest-derive", "vrl/proptest"]


[[bench]]
name = "default"
harness = false
Expand Down
5 changes: 5 additions & 0 deletions LICENSE-3rdparty.csv
Original file line number Diff line number Diff line change
Expand Up @@ -441,6 +441,11 @@ openssl,https://github.com/sfackler/rust-openssl,Apache-2.0,Steven Fackler <sfac
openssl-macros,https://github.com/sfackler/rust-openssl,MIT OR Apache-2.0,The openssl-macros Authors
openssl-probe,https://github.com/alexcrichton/openssl-probe,MIT OR Apache-2.0,Alex Crichton <alex@alexcrichton.com>
openssl-sys,https://github.com/sfackler/rust-openssl,MIT,"Alex Crichton <alex@alexcrichton.com>, Steven Fackler <sfackler@gmail.com>"
opentelemetry,https://github.com/open-telemetry/opentelemetry-rust,Apache-2.0,The opentelemetry Authors
opentelemetry-http,https://github.com/open-telemetry/opentelemetry-rust,Apache-2.0,The opentelemetry-http Authors
opentelemetry-otlp,https://github.com/open-telemetry/opentelemetry-rust/tree/main/opentelemetry-otlp,Apache-2.0,The opentelemetry-otlp Authors
opentelemetry-proto,https://github.com/open-telemetry/opentelemetry-rust/tree/main/opentelemetry-proto,Apache-2.0,The opentelemetry-proto Authors
opentelemetry_sdk,https://github.com/open-telemetry/opentelemetry-rust,Apache-2.0,The opentelemetry_sdk Authors
ordered-float,https://github.com/reem/rust-ordered-float,MIT,"Jonathan Reem <jonathan.reem@gmail.com>, Matt Brubeck <mbrubeck@limpet.net>"
outref,https://github.com/Nugine/outref,MIT,The outref Authors
overload,https://github.com/danaugrs/overload,MIT,Daniel Salvadori <danaugrs@gmail.com>
Expand Down
3 changes: 3 additions & 0 deletions changelog.d/opentelemetry_metrics_sink.feature.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
Added support for sendings metrics via the OpenTelemetry sink to OpenTelemetry collectors

Check warning on line 1 in changelog.d/opentelemetry_metrics_sink.feature.md

View workflow job for this annotation

GitHub Actions / Check Spelling

`sendings` is not a recognized word. (unrecognized-spelling)

authors: brittonhayes
2 changes: 1 addition & 1 deletion lib/opentelemetry-proto/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,10 @@ fn main() -> Result<(), Error> {
"src/proto/opentelemetry-proto/opentelemetry/proto/common/v1/common.proto",
"src/proto/opentelemetry-proto/opentelemetry/proto/resource/v1/resource.proto",
"src/proto/opentelemetry-proto/opentelemetry/proto/logs/v1/logs.proto",
"src/proto/opentelemetry-proto/opentelemetry/proto/metrics/v1/metrics.proto",
"src/proto/opentelemetry-proto/opentelemetry/proto/trace/v1/trace.proto",
"src/proto/opentelemetry-proto/opentelemetry/proto/collector/trace/v1/trace_service.proto",
"src/proto/opentelemetry-proto/opentelemetry/proto/collector/logs/v1/logs_service.proto",
"src/proto/opentelemetry-proto/opentelemetry/proto/metrics/v1/metrics.proto",
"src/proto/opentelemetry-proto/opentelemetry/proto/collector/metrics/v1/metrics_service.proto",
],
&["src/proto/opentelemetry-proto"],
Expand Down
201 changes: 201 additions & 0 deletions src/sinks/opentelemetry/metrics.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,201 @@
use crate::event::metric::{Metric as VectorMetric, MetricValue};
use std::task::{Context, Poll};
use vector_config::configurable_component;

use futures::future::{self, BoxFuture};
use http::StatusCode;
use hyper::Body;
use tower::Service;
use tracing::{debug, trace};
use vector_lib::event::EventStatus;

use opentelemetry::metrics::{Meter, MeterProvider};
use opentelemetry::KeyValue;
use opentelemetry_otlp::{MetricExporter, WithExportConfig};
use opentelemetry_sdk::metrics::{SdkMeterProvider, Temporality};

use crate::event::Event;
use crate::sinks::util::PartitionInnerBuffer;
use futures_util::stream::BoxStream;
use vector_lib::sink::StreamSink;

/// The aggregation temporality to use for metrics.
#[configurable_component]
#[derive(Clone, Copy, Debug)]
#[serde(rename_all = "snake_case")]
pub enum AggregationTemporality {
/// Delta temporality means that metrics are reported as changes since the last report.
Delta,
/// Cumulative temporality means that metrics are reported as cumulative changes since a fixed start time.
Cumulative,
}

impl Default for AggregationTemporality {
fn default() -> Self {
Self::Cumulative
}
}

// Add conversion from AggregationTemporality to the OpenTelemetry SDK's Temporality
impl From<AggregationTemporality> for Temporality {
fn from(temporality: AggregationTemporality) -> Self {
match temporality {
AggregationTemporality::Delta => Temporality::Delta,
AggregationTemporality::Cumulative => Temporality::Cumulative,
}
}
}

#[derive(Default)]
pub struct OpentelemetryMetricNormalize;

// Implementation using the OpenTelemetry SDK
pub struct OpentelemetryMetricsSvc {
meter_provider: SdkMeterProvider,
meter: Meter,
namespace: String,
}

impl OpentelemetryMetricsSvc {
pub fn new(
namespace: String,
endpoint: String,
temporality: AggregationTemporality,
) -> crate::Result<Self> {
// Create the exporter
let exporter = MetricExporter::builder()
.with_http()
.with_endpoint(endpoint)
.with_temporality(Temporality::from(temporality))
.build()
.map_err(|e| crate::Error::from(format!("Failed to build metrics exporter: {}", e)))?;

// Create the meter provider with the exporter
let provider = SdkMeterProvider::builder()
.with_periodic_exporter(exporter)
.build();

let meter = provider.meter("vector");

Ok(Self {
meter_provider: provider,
meter,
namespace,
})
}

// Convert and record Vector metrics using the OpenTelemetry SDK
fn convert_and_record_metrics(&self, events: Vec<VectorMetric>) {
for event in events {
let metric_name = event.name().to_string();
let attributes = event
.tags()
.map(|tags| {
tags.iter_single()
.map(|(k, v)| KeyValue::new(k.to_string(), v.to_string()))
.collect::<Vec<_>>()
})
.unwrap_or_default();

// Add the service.name attribute with the namespace
let mut all_attributes = vec![KeyValue::new("service.name", self.namespace.clone())];
all_attributes.extend(attributes);

match event.value() {
MetricValue::Counter { value } => {
let counter = self.meter.f64_counter(metric_name).build();
counter.add(*value, &all_attributes);
}
MetricValue::Gauge { value } => {
// For gauges, we use a counter since observable gauges require callbacks
let counter = self
.meter
.f64_counter(format!("{}_gauge", metric_name))
.build();
counter.add(*value, &all_attributes);
}
MetricValue::Distribution { samples, .. } => {
let histogram = self.meter.f64_histogram(metric_name).build();
for sample in samples {
// Record each sample with its rate
for _ in 0..sample.rate {
histogram.record(sample.value, &all_attributes);
}
}
}
MetricValue::Set { values } => {
// For sets, we record the count of unique values
let counter = self
.meter
.f64_counter(format!("{}_set", metric_name))
.build();
counter.add(values.len() as f64, &all_attributes);
}
_ => {}
}
}
}
}

impl Service<PartitionInnerBuffer<Vec<VectorMetric>, String>> for OpentelemetryMetricsSvc {
type Response = http::Response<Body>;
type Error = crate::Error;
type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;

fn poll_ready(&mut self, _cx: &mut Context) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}

fn call(&mut self, items: PartitionInnerBuffer<Vec<VectorMetric>, String>) -> Self::Future {
let (metrics, _namespace) = items.into_parts();

// Convert and record metrics
self.convert_and_record_metrics(metrics);

// The SDK handles the export asynchronously, so we just return a success response
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this desirable behaviour for vector? Seems like you loose end to end acks with this. Would it be better to handle the sending in vector itself and only use the SDK for constructing and serializing?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

really good point. I'd like to keep acks and stay consistent with other sinks in terms of shipping flow. Will refine to just use the sdk for serializing.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I gave this an initial try and it might take some additional work to sort out.

I believe this would require us to create an OTLP metric exporter that integrates with Vector's built in buffer/acknowledgements/retry functionality

Right now we're making an exporter using their builder method, but we could likely make a dedicated exporter without the builder that is specific to Vector.

/// Today's approach
let exporter = MetricExporter::builder()
    .with_http()
    .with_endpoint(endpoint)
    .with_temporality(Temporality::from(temporality))
    .build()
    .map_err(|e| crate::Error::from(format!("Failed to build metrics exporter: {}", e)))?;

Here's what we'd have to support

/// seen in
/// opentelemetry_otlp::metric

pub struct MetricExporter {
    client: Box<dyn MetricsClient>,
    temporality: Temporality,
}


impl Debug for MetricExporter {
    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
        f.debug_struct("MetricExporter").finish()
    }
}

#[async_trait]
impl PushMetricExporter for MetricExporter {
    async fn export(&self, metrics: &mut ResourceMetrics) -> OTelSdkResult {
        self.client.export(metrics).await
    }

    async fn force_flush(&self) -> OTelSdkResult {
        // this component is stateless
        Ok(())
    }

    fn shutdown(&self) -> OTelSdkResult {
        self.client.shutdown()
    }

    fn temporality(&self) -> Temporality {
        self.temporality
    }
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

additional challenge, this is not exported outside the otlp crate, meaning its not quite this easy to make our own exporter metrics client.

#[async_trait]
pub(crate) trait MetricsClient: fmt::Debug + Send + Sync + 'static {
    async fn export(&self, metrics: &mut ResourceMetrics) -> OTelSdkResult;
    fn shutdown(&self) -> OTelSdkResult;
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does using the SDK provide a lot of value? I'm not sure its usecase aligns with vector that well. Trying to integrate it might be more hassle than its worth.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

honestly the more I use it the more it does feel like it's adding more hassle than help.

The benefits of the sdk is built in aggregation of metrics though which is helpful. Other than that, it seems to add a lot of abstraction.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While working on the logging support i came to the same conclusion. Let me see if I can get my PR open as draft today, maybe we can converge on the same direction for it. I used the gcp stackdriver sink as a starting point for a sink that allows for custom encoding. Not sure if its right direction, adapting the http sink could also work, but maybe we can avoid some double work

Box::pin(future::ok(
http::Response::builder()
.status(StatusCode::OK)
.body(Body::empty())
.unwrap(),
))
}
}

impl Drop for OpentelemetryMetricsSvc {
fn drop(&mut self) {
// Ensure metrics are exported before shutting down
if let Err(err) = self.meter_provider.shutdown() {
error!("Error shutting down meter provider: {:?}", err);
}
}
}

#[async_trait::async_trait]
impl StreamSink<Event> for OpentelemetryMetricsSvc {
async fn run(mut self: Box<Self>, mut input: BoxStream<'_, Event>) -> Result<(), ()> {
use futures::StreamExt;

debug!("OpenTelemetry metrics sink started");

while let Some(mut event) = input.next().await {
// Extract finalizers before processing
let finalizers = event.metadata_mut().take_finalizers();

// Extract metrics from the event
if let Event::Metric(metric) = event {
trace!("Processing metric event: {}", metric.name());
// Process the metric
self.convert_and_record_metrics(vec![metric]);
} else {
trace!("Ignoring non-metric event");
}

// Finalize the event with success status
finalizers.update_status(EventStatus::Delivered);
}

debug!("OpenTelemetry metrics sink stopped");
Ok(())
}
}
Loading
Loading