From 3a0f035187d937c3dcf1852f4be18c264def5239 Mon Sep 17 00:00:00 2001 From: Pavlos Rontidis Date: Tue, 12 Aug 2025 15:26:37 -0400 Subject: [PATCH 1/4] chore(opentelemetry source): config module and reduce duplication --- src/sources/opentelemetry/config.rs | 328 +++++++++++++++++++++++++++ src/sources/opentelemetry/grpc.rs | 2 +- src/sources/opentelemetry/http.rs | 8 +- src/sources/opentelemetry/mod.rs | 332 +--------------------------- src/sources/opentelemetry/tests.rs | 178 ++++++--------- 5 files changed, 400 insertions(+), 448 deletions(-) create mode 100644 src/sources/opentelemetry/config.rs diff --git a/src/sources/opentelemetry/config.rs b/src/sources/opentelemetry/config.rs new file mode 100644 index 0000000000000..e9763b762c1b4 --- /dev/null +++ b/src/sources/opentelemetry/config.rs @@ -0,0 +1,328 @@ +use std::net::SocketAddr; + +use futures::FutureExt; +use futures_util::future::join; +use futures_util::TryFutureExt; +use tonic::codec::CompressionEncoding; +use vector_lib::lookup::{owned_value_path, OwnedTargetPath}; +use vector_lib::opentelemetry::logs::{ + ATTRIBUTES_KEY, DROPPED_ATTRIBUTES_COUNT_KEY, FLAGS_KEY, OBSERVED_TIMESTAMP_KEY, RESOURCE_KEY, + SEVERITY_NUMBER_KEY, SEVERITY_TEXT_KEY, SPAN_ID_KEY, TRACE_ID_KEY, +}; + +use crate::sources::http_server::{build_param_matcher, remove_duplicates}; +use crate::sources::opentelemetry::grpc::Service; +use crate::sources::opentelemetry::http::{build_warp_filter, run_http_server}; +use crate::{ + config::{ + DataType, GenerateConfig, Resource, SourceAcknowledgementsConfig, SourceConfig, + SourceContext, SourceOutput, + }, + http::KeepaliveConfig, + serde::bool_or_struct, + sources::{util::grpc::run_grpc_server_with_routes, Source}, + tls::{MaybeTlsSettings, TlsEnableableConfig}, +}; +use tonic::transport::server::RoutesBuilder; +use vector_lib::configurable::configurable_component; +use vector_lib::internal_event::{BytesReceived, EventsReceived, Protocol}; +use vector_lib::opentelemetry::proto::collector::{ + logs::v1::logs_service_server::LogsServiceServer, + metrics::v1::metrics_service_server::MetricsServiceServer, + trace::v1::trace_service_server::TraceServiceServer, +}; +use vector_lib::{ + config::{log_schema, LegacyKey, LogNamespace}, + schema::Definition, +}; +use vrl::value::{kind::Collection, Kind}; + +pub const LOGS: &str = "logs"; +pub const METRICS: &str = "metrics"; +pub const TRACES: &str = "traces"; + +/// Configuration for the `opentelemetry` source. +#[configurable_component(source("opentelemetry", "Receive OTLP data through gRPC or HTTP."))] +#[derive(Clone, Debug)] +#[serde(deny_unknown_fields)] +pub struct OpentelemetryConfig { + #[configurable(derived)] + pub grpc: GrpcConfig, + + #[configurable(derived)] + pub http: HttpConfig, + + #[configurable(derived)] + #[serde(default, deserialize_with = "bool_or_struct")] + pub acknowledgements: SourceAcknowledgementsConfig, + + /// The namespace to use for logs. This overrides the global setting. + #[configurable(metadata(docs::hidden))] + #[serde(default)] + pub log_namespace: Option, +} + +/// Configuration for the `opentelemetry` gRPC server. +#[configurable_component] +#[configurable(metadata(docs::examples = "example_grpc_config()"))] +#[derive(Clone, Debug)] +#[serde(deny_unknown_fields)] +pub struct GrpcConfig { + /// The socket address to listen for connections on. + /// + /// It _must_ include a port. + #[configurable(metadata(docs::examples = "0.0.0.0:4317", docs::examples = "localhost:4317"))] + pub address: SocketAddr, + + #[configurable(derived)] + #[serde(default, skip_serializing_if = "Option::is_none")] + pub tls: Option, +} + +fn example_grpc_config() -> GrpcConfig { + GrpcConfig { + address: "0.0.0.0:4317".parse().unwrap(), + tls: None, + } +} + +/// Configuration for the `opentelemetry` HTTP server. +#[configurable_component] +#[configurable(metadata(docs::examples = "example_http_config()"))] +#[derive(Clone, Debug)] +#[serde(deny_unknown_fields)] +pub struct HttpConfig { + /// The socket address to listen for connections on. + /// + /// It _must_ include a port. + #[configurable(metadata(docs::examples = "0.0.0.0:4318", docs::examples = "localhost:4318"))] + pub address: SocketAddr, + + #[configurable(derived)] + #[serde(default, skip_serializing_if = "Option::is_none")] + pub tls: Option, + + #[configurable(derived)] + #[serde(default)] + pub keepalive: KeepaliveConfig, + + /// A list of HTTP headers to include in the log event. + /// + /// Accepts the wildcard (`*`) character for headers matching a specified pattern. + /// + /// Specifying "*" results in all headers included in the log event. + /// + /// These headers are not included in the JSON payload if a field with a conflicting name exists. + #[serde(default)] + #[configurable(metadata(docs::examples = "User-Agent"))] + #[configurable(metadata(docs::examples = "X-My-Custom-Header"))] + #[configurable(metadata(docs::examples = "X-*"))] + #[configurable(metadata(docs::examples = "*"))] + pub headers: Vec, +} + +fn example_http_config() -> HttpConfig { + HttpConfig { + address: "0.0.0.0:4318".parse().unwrap(), + tls: None, + keepalive: KeepaliveConfig::default(), + headers: vec![], + } +} + +impl GenerateConfig for OpentelemetryConfig { + fn generate_config() -> toml::Value { + toml::Value::try_from(Self { + grpc: example_grpc_config(), + http: example_http_config(), + acknowledgements: Default::default(), + log_namespace: None, + }) + .unwrap() + } +} + +#[async_trait::async_trait] +#[typetag::serde(name = "opentelemetry")] +impl SourceConfig for OpentelemetryConfig { + async fn build(&self, cx: SourceContext) -> crate::Result { + let acknowledgements = cx.do_acknowledgements(self.acknowledgements); + let events_received = register!(EventsReceived); + let log_namespace = cx.log_namespace(self.log_namespace); + + let grpc_tls_settings = MaybeTlsSettings::from_config(self.grpc.tls.as_ref(), true)?; + + let log_service = LogsServiceServer::new(Service { + pipeline: cx.out.clone(), + acknowledgements, + log_namespace, + events_received: events_received.clone(), + }) + .accept_compressed(CompressionEncoding::Gzip) + .max_decoding_message_size(usize::MAX); + + let trace_service = TraceServiceServer::new(Service { + pipeline: cx.out.clone(), + acknowledgements, + log_namespace, + events_received: events_received.clone(), + }) + .accept_compressed(CompressionEncoding::Gzip) + .max_decoding_message_size(usize::MAX); + + let metrics_service = MetricsServiceServer::new(Service { + pipeline: cx.out.clone(), + acknowledgements, + log_namespace, + events_received: events_received.clone(), + }) + .accept_compressed(CompressionEncoding::Gzip) + .max_decoding_message_size(usize::MAX); + + let mut builder = RoutesBuilder::default(); + builder + .add_service(log_service) + .add_service(metrics_service) + .add_service(trace_service); + let grpc_source = run_grpc_server_with_routes( + self.grpc.address, + grpc_tls_settings, + builder.routes(), + cx.shutdown.clone(), + ) + .map_err(|error| { + error!(message = "Source future failed.", %error); + }); + + let http_tls_settings = MaybeTlsSettings::from_config(self.http.tls.as_ref(), true)?; + let protocol = http_tls_settings.http_protocol_name(); + let bytes_received = register!(BytesReceived::from(Protocol::from(protocol))); + let headers = + build_param_matcher(&remove_duplicates(self.http.headers.clone(), "headers"))?; + let filters = build_warp_filter( + acknowledgements, + log_namespace, + cx.out, + bytes_received, + events_received, + headers, + ); + let http_source = run_http_server( + self.http.address, + http_tls_settings, + filters, + cx.shutdown, + self.http.keepalive.clone(), + ); + + Ok(join(grpc_source, http_source).map(|_| Ok(())).boxed()) + } + + // TODO: appropriately handle "severity" meaning across both "severity_text" and "severity_number", + // as both are optional and can be converted to/from. + fn outputs(&self, global_log_namespace: LogNamespace) -> Vec { + let log_namespace = global_log_namespace.merge(self.log_namespace); + let schema_definition = Definition::new_with_default_metadata(Kind::any(), [log_namespace]) + .with_source_metadata( + Self::NAME, + Some(LegacyKey::Overwrite(owned_value_path!(RESOURCE_KEY))), + &owned_value_path!(RESOURCE_KEY), + Kind::object(Collection::from_unknown(Kind::any())).or_undefined(), + None, + ) + .with_source_metadata( + Self::NAME, + Some(LegacyKey::Overwrite(owned_value_path!(ATTRIBUTES_KEY))), + &owned_value_path!(ATTRIBUTES_KEY), + Kind::object(Collection::from_unknown(Kind::any())).or_undefined(), + None, + ) + .with_source_metadata( + Self::NAME, + Some(LegacyKey::Overwrite(owned_value_path!(TRACE_ID_KEY))), + &owned_value_path!(TRACE_ID_KEY), + Kind::bytes().or_undefined(), + None, + ) + .with_source_metadata( + Self::NAME, + Some(LegacyKey::Overwrite(owned_value_path!(SPAN_ID_KEY))), + &owned_value_path!(SPAN_ID_KEY), + Kind::bytes().or_undefined(), + None, + ) + .with_source_metadata( + Self::NAME, + Some(LegacyKey::Overwrite(owned_value_path!(SEVERITY_TEXT_KEY))), + &owned_value_path!(SEVERITY_TEXT_KEY), + Kind::bytes().or_undefined(), + Some("severity"), + ) + .with_source_metadata( + Self::NAME, + Some(LegacyKey::Overwrite(owned_value_path!(SEVERITY_NUMBER_KEY))), + &owned_value_path!(SEVERITY_NUMBER_KEY), + Kind::integer().or_undefined(), + None, + ) + .with_source_metadata( + Self::NAME, + Some(LegacyKey::Overwrite(owned_value_path!(FLAGS_KEY))), + &owned_value_path!(FLAGS_KEY), + Kind::integer().or_undefined(), + None, + ) + .with_source_metadata( + Self::NAME, + Some(LegacyKey::Overwrite(owned_value_path!( + DROPPED_ATTRIBUTES_COUNT_KEY + ))), + &owned_value_path!(DROPPED_ATTRIBUTES_COUNT_KEY), + Kind::integer(), + None, + ) + .with_source_metadata( + Self::NAME, + Some(LegacyKey::Overwrite(owned_value_path!( + OBSERVED_TIMESTAMP_KEY + ))), + &owned_value_path!(OBSERVED_TIMESTAMP_KEY), + Kind::timestamp(), + None, + ) + .with_source_metadata( + Self::NAME, + None, + &owned_value_path!("timestamp"), + Kind::timestamp(), + Some("timestamp"), + ) + .with_standard_vector_source_metadata(); + + let schema_definition = match log_namespace { + LogNamespace::Vector => { + schema_definition.with_meaning(OwnedTargetPath::event_root(), "message") + } + LogNamespace::Legacy => { + schema_definition.with_meaning(log_schema().owned_message_path(), "message") + } + }; + + vec![ + SourceOutput::new_maybe_logs(DataType::Log, schema_definition).with_port(LOGS), + SourceOutput::new_metrics().with_port(METRICS), + SourceOutput::new_traces().with_port(TRACES), + ] + } + + fn resources(&self) -> Vec { + vec![ + Resource::tcp(self.grpc.address), + Resource::tcp(self.http.address), + ] + } + + fn can_acknowledge(&self) -> bool { + true + } +} diff --git a/src/sources/opentelemetry/grpc.rs b/src/sources/opentelemetry/grpc.rs index 32a4d766c17a2..6b894b8c626e0 100644 --- a/src/sources/opentelemetry/grpc.rs +++ b/src/sources/opentelemetry/grpc.rs @@ -21,7 +21,7 @@ use vector_lib::{ use crate::{ internal_events::{EventsReceived, StreamClosedError}, - sources::opentelemetry::{LOGS, METRICS, TRACES}, + sources::opentelemetry::config::{LOGS, METRICS, TRACES}, SourceSender, }; diff --git a/src/sources/opentelemetry/http.rs b/src/sources/opentelemetry/http.rs index d3c3798dd14d6..c3c9fc51f4ac7 100644 --- a/src/sources/opentelemetry/http.rs +++ b/src/sources/opentelemetry/http.rs @@ -42,8 +42,8 @@ use crate::{ SourceSender, }; -use super::OpentelemetryConfig; use super::{reply::protobuf, status::Status}; +use crate::sources::opentelemetry::config::{OpentelemetryConfig, LOGS, METRICS, TRACES}; #[derive(Clone, Copy, Debug, Snafu)] pub(crate) enum ApiError { @@ -171,7 +171,7 @@ fn build_warp_log_filter( events, acknowledgements, out.clone(), - super::LOGS, + LOGS, ExportLogsServiceResponse::default(), ) }, @@ -203,7 +203,7 @@ fn build_warp_metrics_filter( events, acknowledgements, out.clone(), - super::METRICS, + METRICS, ExportMetricsServiceResponse::default(), ) }) @@ -234,7 +234,7 @@ fn build_warp_trace_filter( events, acknowledgements, out.clone(), - super::TRACES, + TRACES, ExportTraceServiceResponse::default(), ) }) diff --git a/src/sources/opentelemetry/mod.rs b/src/sources/opentelemetry/mod.rs index 8f79673d86186..d67129054a795 100644 --- a/src/sources/opentelemetry/mod.rs +++ b/src/sources/opentelemetry/mod.rs @@ -7,334 +7,4 @@ mod grpc; mod http; mod reply; mod status; - -use std::net::SocketAddr; - -use futures::{future::join, FutureExt, TryFutureExt}; -use tonic::codec::CompressionEncoding; -use vector_lib::lookup::{owned_value_path, OwnedTargetPath}; -use vector_lib::opentelemetry::logs::{ - ATTRIBUTES_KEY, DROPPED_ATTRIBUTES_COUNT_KEY, FLAGS_KEY, OBSERVED_TIMESTAMP_KEY, RESOURCE_KEY, - SEVERITY_NUMBER_KEY, SEVERITY_TEXT_KEY, SPAN_ID_KEY, TRACE_ID_KEY, -}; - -use tonic::transport::server::RoutesBuilder; -use vector_lib::configurable::configurable_component; -use vector_lib::internal_event::{BytesReceived, EventsReceived, Protocol}; -use vector_lib::opentelemetry::proto::collector::{ - logs::v1::logs_service_server::LogsServiceServer, - metrics::v1::metrics_service_server::MetricsServiceServer, - trace::v1::trace_service_server::TraceServiceServer, -}; -use vector_lib::{ - config::{log_schema, LegacyKey, LogNamespace}, - schema::Definition, -}; -use vrl::value::{kind::Collection, Kind}; - -use self::{ - grpc::Service, - http::{build_warp_filter, run_http_server}, -}; -use crate::{ - config::{ - DataType, GenerateConfig, Resource, SourceAcknowledgementsConfig, SourceConfig, - SourceContext, SourceOutput, - }, - http::KeepaliveConfig, - serde::bool_or_struct, - sources::{util::grpc::run_grpc_server_with_routes, Source}, - tls::{MaybeTlsSettings, TlsEnableableConfig}, -}; - -use super::http_server::{build_param_matcher, remove_duplicates}; - -pub const LOGS: &str = "logs"; -pub const METRICS: &str = "metrics"; -pub const TRACES: &str = "traces"; - -/// Configuration for the `opentelemetry` source. -#[configurable_component(source("opentelemetry", "Receive OTLP data through gRPC or HTTP."))] -#[derive(Clone, Debug)] -#[serde(deny_unknown_fields)] -pub struct OpentelemetryConfig { - #[configurable(derived)] - grpc: GrpcConfig, - - #[configurable(derived)] - http: HttpConfig, - - #[configurable(derived)] - #[serde(default, deserialize_with = "bool_or_struct")] - acknowledgements: SourceAcknowledgementsConfig, - - /// The namespace to use for logs. This overrides the global setting. - #[configurable(metadata(docs::hidden))] - #[serde(default)] - log_namespace: Option, -} - -/// Configuration for the `opentelemetry` gRPC server. -#[configurable_component] -#[configurable(metadata(docs::examples = "example_grpc_config()"))] -#[derive(Clone, Debug)] -#[serde(deny_unknown_fields)] -struct GrpcConfig { - /// The socket address to listen for connections on. - /// - /// It _must_ include a port. - #[configurable(metadata(docs::examples = "0.0.0.0:4317", docs::examples = "localhost:4317"))] - address: SocketAddr, - - #[configurable(derived)] - #[serde(default, skip_serializing_if = "Option::is_none")] - tls: Option, -} - -fn example_grpc_config() -> GrpcConfig { - GrpcConfig { - address: "0.0.0.0:4317".parse().unwrap(), - tls: None, - } -} - -/// Configuration for the `opentelemetry` HTTP server. -#[configurable_component] -#[configurable(metadata(docs::examples = "example_http_config()"))] -#[derive(Clone, Debug)] -#[serde(deny_unknown_fields)] -struct HttpConfig { - /// The socket address to listen for connections on. - /// - /// It _must_ include a port. - #[configurable(metadata(docs::examples = "0.0.0.0:4318", docs::examples = "localhost:4318"))] - address: SocketAddr, - - #[configurable(derived)] - #[serde(default, skip_serializing_if = "Option::is_none")] - tls: Option, - - #[configurable(derived)] - #[serde(default)] - keepalive: KeepaliveConfig, - - /// A list of HTTP headers to include in the log event. - /// - /// Accepts the wildcard (`*`) character for headers matching a specified pattern. - /// - /// Specifying "*" results in all headers included in the log event. - /// - /// These headers are not included in the JSON payload if a field with a conflicting name exists. - #[serde(default)] - #[configurable(metadata(docs::examples = "User-Agent"))] - #[configurable(metadata(docs::examples = "X-My-Custom-Header"))] - #[configurable(metadata(docs::examples = "X-*"))] - #[configurable(metadata(docs::examples = "*"))] - headers: Vec, -} - -fn example_http_config() -> HttpConfig { - HttpConfig { - address: "0.0.0.0:4318".parse().unwrap(), - tls: None, - keepalive: KeepaliveConfig::default(), - headers: vec![], - } -} - -impl GenerateConfig for OpentelemetryConfig { - fn generate_config() -> toml::Value { - toml::Value::try_from(Self { - grpc: example_grpc_config(), - http: example_http_config(), - acknowledgements: Default::default(), - log_namespace: None, - }) - .unwrap() - } -} - -#[async_trait::async_trait] -#[typetag::serde(name = "opentelemetry")] -impl SourceConfig for OpentelemetryConfig { - async fn build(&self, cx: SourceContext) -> crate::Result { - let acknowledgements = cx.do_acknowledgements(self.acknowledgements); - let events_received = register!(EventsReceived); - let log_namespace = cx.log_namespace(self.log_namespace); - - let grpc_tls_settings = MaybeTlsSettings::from_config(self.grpc.tls.as_ref(), true)?; - - let log_service = LogsServiceServer::new(Service { - pipeline: cx.out.clone(), - acknowledgements, - log_namespace, - events_received: events_received.clone(), - }) - .accept_compressed(CompressionEncoding::Gzip) - .max_decoding_message_size(usize::MAX); - - let trace_service = TraceServiceServer::new(Service { - pipeline: cx.out.clone(), - acknowledgements, - log_namespace, - events_received: events_received.clone(), - }) - .accept_compressed(CompressionEncoding::Gzip) - .max_decoding_message_size(usize::MAX); - - let metrics_service = MetricsServiceServer::new(Service { - pipeline: cx.out.clone(), - acknowledgements, - log_namespace, - events_received: events_received.clone(), - }) - .accept_compressed(CompressionEncoding::Gzip) - .max_decoding_message_size(usize::MAX); - - let mut builder = RoutesBuilder::default(); - builder - .add_service(log_service) - .add_service(metrics_service) - .add_service(trace_service); - let grpc_source = run_grpc_server_with_routes( - self.grpc.address, - grpc_tls_settings, - builder.routes(), - cx.shutdown.clone(), - ) - .map_err(|error| { - error!(message = "Source future failed.", %error); - }); - - let http_tls_settings = MaybeTlsSettings::from_config(self.http.tls.as_ref(), true)?; - let protocol = http_tls_settings.http_protocol_name(); - let bytes_received = register!(BytesReceived::from(Protocol::from(protocol))); - let headers = - build_param_matcher(&remove_duplicates(self.http.headers.clone(), "headers"))?; - let filters = build_warp_filter( - acknowledgements, - log_namespace, - cx.out, - bytes_received, - events_received, - headers, - ); - let http_source = run_http_server( - self.http.address, - http_tls_settings, - filters, - cx.shutdown, - self.http.keepalive.clone(), - ); - - Ok(join(grpc_source, http_source).map(|_| Ok(())).boxed()) - } - - // TODO: appropriately handle "severity" meaning across both "severity_text" and "severity_number", - // as both are optional and can be converted to/from. - fn outputs(&self, global_log_namespace: LogNamespace) -> Vec { - let log_namespace = global_log_namespace.merge(self.log_namespace); - let schema_definition = Definition::new_with_default_metadata(Kind::any(), [log_namespace]) - .with_source_metadata( - Self::NAME, - Some(LegacyKey::Overwrite(owned_value_path!(RESOURCE_KEY))), - &owned_value_path!(RESOURCE_KEY), - Kind::object(Collection::from_unknown(Kind::any())).or_undefined(), - None, - ) - .with_source_metadata( - Self::NAME, - Some(LegacyKey::Overwrite(owned_value_path!(ATTRIBUTES_KEY))), - &owned_value_path!(ATTRIBUTES_KEY), - Kind::object(Collection::from_unknown(Kind::any())).or_undefined(), - None, - ) - .with_source_metadata( - Self::NAME, - Some(LegacyKey::Overwrite(owned_value_path!(TRACE_ID_KEY))), - &owned_value_path!(TRACE_ID_KEY), - Kind::bytes().or_undefined(), - None, - ) - .with_source_metadata( - Self::NAME, - Some(LegacyKey::Overwrite(owned_value_path!(SPAN_ID_KEY))), - &owned_value_path!(SPAN_ID_KEY), - Kind::bytes().or_undefined(), - None, - ) - .with_source_metadata( - Self::NAME, - Some(LegacyKey::Overwrite(owned_value_path!(SEVERITY_TEXT_KEY))), - &owned_value_path!(SEVERITY_TEXT_KEY), - Kind::bytes().or_undefined(), - Some("severity"), - ) - .with_source_metadata( - Self::NAME, - Some(LegacyKey::Overwrite(owned_value_path!(SEVERITY_NUMBER_KEY))), - &owned_value_path!(SEVERITY_NUMBER_KEY), - Kind::integer().or_undefined(), - None, - ) - .with_source_metadata( - Self::NAME, - Some(LegacyKey::Overwrite(owned_value_path!(FLAGS_KEY))), - &owned_value_path!(FLAGS_KEY), - Kind::integer().or_undefined(), - None, - ) - .with_source_metadata( - Self::NAME, - Some(LegacyKey::Overwrite(owned_value_path!( - DROPPED_ATTRIBUTES_COUNT_KEY - ))), - &owned_value_path!(DROPPED_ATTRIBUTES_COUNT_KEY), - Kind::integer(), - None, - ) - .with_source_metadata( - Self::NAME, - Some(LegacyKey::Overwrite(owned_value_path!( - OBSERVED_TIMESTAMP_KEY - ))), - &owned_value_path!(OBSERVED_TIMESTAMP_KEY), - Kind::timestamp(), - None, - ) - .with_source_metadata( - Self::NAME, - None, - &owned_value_path!("timestamp"), - Kind::timestamp(), - Some("timestamp"), - ) - .with_standard_vector_source_metadata(); - - let schema_definition = match log_namespace { - LogNamespace::Vector => { - schema_definition.with_meaning(OwnedTargetPath::event_root(), "message") - } - LogNamespace::Legacy => { - schema_definition.with_meaning(log_schema().owned_message_path(), "message") - } - }; - - vec![ - SourceOutput::new_maybe_logs(DataType::Log, schema_definition).with_port(LOGS), - SourceOutput::new_metrics().with_port(METRICS), - SourceOutput::new_traces().with_port(TRACES), - ] - } - - fn resources(&self) -> Vec { - vec![ - Resource::tcp(self.grpc.address), - Resource::tcp(self.http.address), - ] - } - - fn can_acknowledge(&self) -> bool { - true - } -} +mod config; diff --git a/src/sources/opentelemetry/tests.rs b/src/sources/opentelemetry/tests.rs index a0fa9868acc58..5cc33b0a687e0 100644 --- a/src/sources/opentelemetry/tests.rs +++ b/src/sources/opentelemetry/tests.rs @@ -6,7 +6,7 @@ use crate::{ event::{ into_event_stream, Event, EventStatus, LogEvent, Metric as MetricEvent, ObjectMap, Value, }, - sources::opentelemetry::{GrpcConfig, HttpConfig, OpentelemetryConfig, LOGS, METRICS}, + sources::opentelemetry::config::{GrpcConfig, HttpConfig, OpentelemetryConfig, LOGS, METRICS}, test_util::{ self, components::{assert_source_compliance, SOURCE_TAGS}, @@ -24,27 +24,75 @@ use std::time::{SystemTime, UNIX_EPOCH}; use tonic::Request; use vector_lib::config::LogNamespace; use vector_lib::lookup::path; -use vector_lib::opentelemetry::proto::collector::metrics::v1::metrics_service_client::MetricsServiceClient; -use vector_lib::opentelemetry::proto::collector::metrics::v1::ExportMetricsServiceRequest; -use vector_lib::opentelemetry::proto::common::v1::any_value::Value::StringValue; -use vector_lib::opentelemetry::proto::metrics::v1::exponential_histogram_data_point::Buckets; -use vector_lib::opentelemetry::proto::metrics::v1::metric::Data; -use vector_lib::opentelemetry::proto::metrics::v1::summary_data_point::ValueAtQuantile; -use vector_lib::opentelemetry::proto::metrics::v1::{ - AggregationTemporality, ExponentialHistogram, ExponentialHistogramDataPoint, Gauge, Histogram, - HistogramDataPoint, Metric, NumberDataPoint, ResourceMetrics, ScopeMetrics, Sum, Summary, - SummaryDataPoint, -}; -use vector_lib::opentelemetry::proto::resource::v1::Resource; use vector_lib::opentelemetry::proto::{ - collector::logs::v1::{logs_service_client::LogsServiceClient, ExportLogsServiceRequest}, - common::v1::{any_value, AnyValue, InstrumentationScope, KeyValue}, + collector::{ + logs::v1::{logs_service_client::LogsServiceClient, ExportLogsServiceRequest}, + metrics::v1::{metrics_service_client::MetricsServiceClient, ExportMetricsServiceRequest}, + }, + common::v1::{any_value, any_value::Value::StringValue, AnyValue, InstrumentationScope, KeyValue}, logs::v1::{LogRecord, ResourceLogs, ScopeLogs}, - resource::v1::Resource as OtelResource, + metrics::v1::{ + exponential_histogram_data_point::Buckets, metric::Data, summary_data_point::ValueAtQuantile, AggregationTemporality, + ExponentialHistogram, ExponentialHistogramDataPoint, Gauge, Histogram, HistogramDataPoint, Metric, + NumberDataPoint, ResourceMetrics, ScopeMetrics, Sum, + Summary, SummaryDataPoint, + }, + resource::v1::{Resource, Resource as OtelResource}, }; use vrl::value; use warp::http::HeaderMap; +fn create_test_logs_request() -> Request { + Request::new(ExportLogsServiceRequest { + resource_logs: vec![ResourceLogs { + resource: Some(OtelResource { + attributes: vec![KeyValue { + key: "res_key".into(), + value: Some(AnyValue { + value: Some(StringValue("res_val".into())), + }), + }], + dropped_attributes_count: 0, + }), + scope_logs: vec![ScopeLogs { + scope: Some(InstrumentationScope { + name: "some.scope.name".into(), + version: "1.2.3".into(), + attributes: vec![KeyValue { + key: "scope_attr".into(), + value: Some(AnyValue { + value: Some(StringValue("scope_val".into())), + }), + }], + dropped_attributes_count: 7, + }), + log_records: vec![LogRecord { + time_unix_nano: 1, + observed_time_unix_nano: 2, + severity_number: 9, + severity_text: "info".into(), + body: Some(AnyValue { + value: Some(StringValue("log body".into())), + }), + attributes: vec![KeyValue { + key: "attr_key".into(), + value: Some(AnyValue { + value: Some(StringValue("attr_val".into())), + }), + }], + dropped_attributes_count: 3, + flags: 4, + // opentelemetry sdk will hex::decode the given trace_id and span_id + trace_id: str_into_hex_bytes("4ac52aadf321c2e531db005df08792f5"), + span_id: str_into_hex_bytes("0b9e4bda2a55530d"), + }], + schema_url: "v1".into(), + }], + schema_url: "v1".into(), + }], + }) +} + #[test] fn generate_config() { test_util::test_generate_config::(); @@ -64,54 +112,7 @@ async fn receive_grpc_logs_vector_namespace() { let mut client = LogsServiceClient::connect(format!("http://{}", env.grpc_addr)) .await .unwrap(); - let req = Request::new(ExportLogsServiceRequest { - resource_logs: vec![ResourceLogs { - resource: Some(OtelResource { - attributes: vec![KeyValue { - key: "res_key".into(), - value: Some(AnyValue { - value: Some(StringValue("res_val".into())), - }), - }], - dropped_attributes_count: 0, - }), - scope_logs: vec![ScopeLogs { - scope: Some(InstrumentationScope { - name: "some.scope.name".into(), - version: "1.2.3".into(), - attributes: vec![KeyValue { - key: "scope_attr".into(), - value: Some(AnyValue { - value: Some(StringValue("scope_val".into())), - }), - }], - dropped_attributes_count: 7, - }), - log_records: vec![LogRecord { - time_unix_nano: 1, - observed_time_unix_nano: 2, - severity_number: 9, - severity_text: "info".into(), - body: Some(AnyValue { - value: Some(StringValue("log body".into())), - }), - attributes: vec![KeyValue { - key: "attr_key".into(), - value: Some(AnyValue { - value: Some(StringValue("attr_val".into())), - }), - }], - dropped_attributes_count: 3, - flags: 4, - // opentelemetry sdk will hex::decode the given trace_id and span_id - trace_id: str_into_hex_bytes("4ac52aadf321c2e531db005df08792f5"), - span_id: str_into_hex_bytes("0b9e4bda2a55530d"), - }], - schema_url: "v1".into(), - }], - schema_url: "v1".into(), - }], - }); + let req = create_test_logs_request(); _ = client.export(req).await; let mut output = test_util::collect_ready(env.output).await; // we just send one, so only one output @@ -209,54 +210,7 @@ async fn receive_grpc_logs_legacy_namespace() { let mut client = LogsServiceClient::connect(format!("http://{}", env.grpc_addr)) .await .unwrap(); - let req = Request::new(ExportLogsServiceRequest { - resource_logs: vec![ResourceLogs { - resource: Some(OtelResource { - attributes: vec![KeyValue { - key: "res_key".into(), - value: Some(AnyValue { - value: Some(StringValue("res_val".into())), - }), - }], - dropped_attributes_count: 0, - }), - scope_logs: vec![ScopeLogs { - scope: Some(InstrumentationScope { - name: "some.scope.name".into(), - version: "1.2.3".into(), - attributes: vec![KeyValue { - key: "scope_attr".into(), - value: Some(AnyValue { - value: Some(StringValue("scope_val".into())), - }), - }], - dropped_attributes_count: 7, - }), - log_records: vec![LogRecord { - time_unix_nano: 1, - observed_time_unix_nano: 2, - severity_number: 9, - severity_text: "info".into(), - body: Some(AnyValue { - value: Some(StringValue("log body".into())), - }), - attributes: vec![KeyValue { - key: "attr_key".into(), - value: Some(AnyValue { - value: Some(StringValue("attr_val".into())), - }), - }], - dropped_attributes_count: 3, - flags: 4, - // opentelemetry sdk will hex::decode the given trace_id and span_id - trace_id: str_into_hex_bytes("4ac52aadf321c2e531db005df08792f5"), - span_id: str_into_hex_bytes("0b9e4bda2a55530d"), - }], - schema_url: "v1".into(), - }], - schema_url: "v1".into(), - }], - }); + let req = create_test_logs_request(); _ = client.export(req).await; let mut output = test_util::collect_ready(env.output).await; // we just send one, so only one output From 20e04e2489b66c0e653f89a4255a9fc14bdcd138 Mon Sep 17 00:00:00 2001 From: Pavlos Rontidis Date: Tue, 12 Aug 2025 15:26:57 -0400 Subject: [PATCH 2/4] ran cargo fmt --- src/sources/opentelemetry/config.rs | 20 ++++++++++---------- src/sources/opentelemetry/mod.rs | 2 +- src/sources/opentelemetry/tests.rs | 12 +++++++----- 3 files changed, 18 insertions(+), 16 deletions(-) diff --git a/src/sources/opentelemetry/config.rs b/src/sources/opentelemetry/config.rs index e9763b762c1b4..147e11d731f19 100644 --- a/src/sources/opentelemetry/config.rs +++ b/src/sources/opentelemetry/config.rs @@ -138,7 +138,7 @@ impl GenerateConfig for OpentelemetryConfig { acknowledgements: Default::default(), log_namespace: None, }) - .unwrap() + .unwrap() } } @@ -158,8 +158,8 @@ impl SourceConfig for OpentelemetryConfig { log_namespace, events_received: events_received.clone(), }) - .accept_compressed(CompressionEncoding::Gzip) - .max_decoding_message_size(usize::MAX); + .accept_compressed(CompressionEncoding::Gzip) + .max_decoding_message_size(usize::MAX); let trace_service = TraceServiceServer::new(Service { pipeline: cx.out.clone(), @@ -167,8 +167,8 @@ impl SourceConfig for OpentelemetryConfig { log_namespace, events_received: events_received.clone(), }) - .accept_compressed(CompressionEncoding::Gzip) - .max_decoding_message_size(usize::MAX); + .accept_compressed(CompressionEncoding::Gzip) + .max_decoding_message_size(usize::MAX); let metrics_service = MetricsServiceServer::new(Service { pipeline: cx.out.clone(), @@ -176,8 +176,8 @@ impl SourceConfig for OpentelemetryConfig { log_namespace, events_received: events_received.clone(), }) - .accept_compressed(CompressionEncoding::Gzip) - .max_decoding_message_size(usize::MAX); + .accept_compressed(CompressionEncoding::Gzip) + .max_decoding_message_size(usize::MAX); let mut builder = RoutesBuilder::default(); builder @@ -190,9 +190,9 @@ impl SourceConfig for OpentelemetryConfig { builder.routes(), cx.shutdown.clone(), ) - .map_err(|error| { - error!(message = "Source future failed.", %error); - }); + .map_err(|error| { + error!(message = "Source future failed.", %error); + }); let http_tls_settings = MaybeTlsSettings::from_config(self.http.tls.as_ref(), true)?; let protocol = http_tls_settings.http_protocol_name(); diff --git a/src/sources/opentelemetry/mod.rs b/src/sources/opentelemetry/mod.rs index d67129054a795..1732c76ef464c 100644 --- a/src/sources/opentelemetry/mod.rs +++ b/src/sources/opentelemetry/mod.rs @@ -3,8 +3,8 @@ mod integration_tests; #[cfg(test)] mod tests; +mod config; mod grpc; mod http; mod reply; mod status; -mod config; diff --git a/src/sources/opentelemetry/tests.rs b/src/sources/opentelemetry/tests.rs index 5cc33b0a687e0..f8920cdd3c89e 100644 --- a/src/sources/opentelemetry/tests.rs +++ b/src/sources/opentelemetry/tests.rs @@ -29,13 +29,15 @@ use vector_lib::opentelemetry::proto::{ logs::v1::{logs_service_client::LogsServiceClient, ExportLogsServiceRequest}, metrics::v1::{metrics_service_client::MetricsServiceClient, ExportMetricsServiceRequest}, }, - common::v1::{any_value, any_value::Value::StringValue, AnyValue, InstrumentationScope, KeyValue}, + common::v1::{ + any_value, any_value::Value::StringValue, AnyValue, InstrumentationScope, KeyValue, + }, logs::v1::{LogRecord, ResourceLogs, ScopeLogs}, metrics::v1::{ - exponential_histogram_data_point::Buckets, metric::Data, summary_data_point::ValueAtQuantile, AggregationTemporality, - ExponentialHistogram, ExponentialHistogramDataPoint, Gauge, Histogram, HistogramDataPoint, Metric, - NumberDataPoint, ResourceMetrics, ScopeMetrics, Sum, - Summary, SummaryDataPoint, + exponential_histogram_data_point::Buckets, metric::Data, + summary_data_point::ValueAtQuantile, AggregationTemporality, ExponentialHistogram, + ExponentialHistogramDataPoint, Gauge, Histogram, HistogramDataPoint, Metric, + NumberDataPoint, ResourceMetrics, ScopeMetrics, Sum, Summary, SummaryDataPoint, }, resource::v1::{Resource, Resource as OtelResource}, }; From f642c5062f1c4f5bac5551b32bbfa3fc5844dd53 Mon Sep 17 00:00:00 2001 From: Pavlos Rontidis Date: Tue, 12 Aug 2025 16:09:46 -0400 Subject: [PATCH 3/4] fix and reorganize imports --- src/sources/opentelemetry/config.rs | 59 +++++++++++-------- .../opentelemetry/integration_tests.rs | 6 +- src/sources/opentelemetry/mod.rs | 2 +- 3 files changed, 39 insertions(+), 28 deletions(-) diff --git a/src/sources/opentelemetry/config.rs b/src/sources/opentelemetry/config.rs index 147e11d731f19..f95a18656b894 100644 --- a/src/sources/opentelemetry/config.rs +++ b/src/sources/opentelemetry/config.rs @@ -1,18 +1,33 @@ use std::net::SocketAddr; use futures::FutureExt; -use futures_util::future::join; -use futures_util::TryFutureExt; -use tonic::codec::CompressionEncoding; -use vector_lib::lookup::{owned_value_path, OwnedTargetPath}; -use vector_lib::opentelemetry::logs::{ - ATTRIBUTES_KEY, DROPPED_ATTRIBUTES_COUNT_KEY, FLAGS_KEY, OBSERVED_TIMESTAMP_KEY, RESOURCE_KEY, - SEVERITY_NUMBER_KEY, SEVERITY_TEXT_KEY, SPAN_ID_KEY, TRACE_ID_KEY, +use futures_util::{future::join, TryFutureExt}; + +use tonic::{ + codec::CompressionEncoding, + transport::server::RoutesBuilder, +}; + +use vector_lib::{ + config::{log_schema, LegacyKey, LogNamespace}, + configurable::configurable_component, + internal_event::{BytesReceived, EventsReceived, Protocol}, + lookup::{owned_value_path, OwnedTargetPath}, + opentelemetry::{ + logs::{ + ATTRIBUTES_KEY, DROPPED_ATTRIBUTES_COUNT_KEY, FLAGS_KEY, OBSERVED_TIMESTAMP_KEY, + RESOURCE_KEY, SEVERITY_NUMBER_KEY, SEVERITY_TEXT_KEY, SPAN_ID_KEY, TRACE_ID_KEY, + }, + proto::collector::{ + logs::v1::logs_service_server::LogsServiceServer, + metrics::v1::metrics_service_server::MetricsServiceServer, + trace::v1::trace_service_server::TraceServiceServer, + }, + }, + schema::Definition, + tls::{MaybeTlsSettings, TlsEnableableConfig}, }; -use crate::sources::http_server::{build_param_matcher, remove_duplicates}; -use crate::sources::opentelemetry::grpc::Service; -use crate::sources::opentelemetry::http::{build_warp_filter, run_http_server}; use crate::{ config::{ DataType, GenerateConfig, Resource, SourceAcknowledgementsConfig, SourceConfig, @@ -20,21 +35,17 @@ use crate::{ }, http::KeepaliveConfig, serde::bool_or_struct, - sources::{util::grpc::run_grpc_server_with_routes, Source}, - tls::{MaybeTlsSettings, TlsEnableableConfig}, -}; -use tonic::transport::server::RoutesBuilder; -use vector_lib::configurable::configurable_component; -use vector_lib::internal_event::{BytesReceived, EventsReceived, Protocol}; -use vector_lib::opentelemetry::proto::collector::{ - logs::v1::logs_service_server::LogsServiceServer, - metrics::v1::metrics_service_server::MetricsServiceServer, - trace::v1::trace_service_server::TraceServiceServer, -}; -use vector_lib::{ - config::{log_schema, LegacyKey, LogNamespace}, - schema::Definition, + sources::{ + http_server::{build_param_matcher, remove_duplicates}, + opentelemetry::{ + grpc::Service, + http::{build_warp_filter, run_http_server}, + }, + util::grpc::run_grpc_server_with_routes, + Source, + }, }; + use vrl::value::{kind::Collection, Kind}; pub const LOGS: &str = "logs"; diff --git a/src/sources/opentelemetry/integration_tests.rs b/src/sources/opentelemetry/integration_tests.rs index feb7717460884..ba720828b3697 100644 --- a/src/sources/opentelemetry/integration_tests.rs +++ b/src/sources/opentelemetry/integration_tests.rs @@ -3,19 +3,19 @@ use std::time::{Duration, SystemTime, UNIX_EPOCH}; use itertools::Itertools; use serde_json::json; -use super::{LOGS, METRICS, TRACES}; use crate::{ config::{log_schema, SourceConfig, SourceContext}, event::EventStatus, + sources::opentelemetry::config::{GrpcConfig, HttpConfig, OpentelemetryConfig, LOGS, METRICS, TRACES}, test_util::{ collect_n, components::{assert_source_compliance, SOURCE_TAGS}, retry_until, wait_for_tcp, - }, + } }; use prost::Message; -use super::{tests::new_source, GrpcConfig, HttpConfig, OpentelemetryConfig}; +use super::tests::new_source; use vector_lib::opentelemetry::proto::{ collector::{metrics::v1::ExportMetricsServiceRequest, trace::v1::ExportTraceServiceRequest}, common::v1::{any_value::Value::StringValue, AnyValue, InstrumentationScope, KeyValue}, diff --git a/src/sources/opentelemetry/mod.rs b/src/sources/opentelemetry/mod.rs index 1732c76ef464c..0e4129798b525 100644 --- a/src/sources/opentelemetry/mod.rs +++ b/src/sources/opentelemetry/mod.rs @@ -3,7 +3,7 @@ mod integration_tests; #[cfg(test)] mod tests; -mod config; +pub mod config; mod grpc; mod http; mod reply; From 4bec29a9a757409433d554b6a2899be3291fac0b Mon Sep 17 00:00:00 2001 From: Pavlos Rontidis Date: Tue, 12 Aug 2025 16:10:00 -0400 Subject: [PATCH 4/4] ran cargo fmt --- src/sources/opentelemetry/config.rs | 5 +---- src/sources/opentelemetry/integration_tests.rs | 6 ++++-- 2 files changed, 5 insertions(+), 6 deletions(-) diff --git a/src/sources/opentelemetry/config.rs b/src/sources/opentelemetry/config.rs index f95a18656b894..6d5c08c0676e9 100644 --- a/src/sources/opentelemetry/config.rs +++ b/src/sources/opentelemetry/config.rs @@ -3,10 +3,7 @@ use std::net::SocketAddr; use futures::FutureExt; use futures_util::{future::join, TryFutureExt}; -use tonic::{ - codec::CompressionEncoding, - transport::server::RoutesBuilder, -}; +use tonic::{codec::CompressionEncoding, transport::server::RoutesBuilder}; use vector_lib::{ config::{log_schema, LegacyKey, LogNamespace}, diff --git a/src/sources/opentelemetry/integration_tests.rs b/src/sources/opentelemetry/integration_tests.rs index ba720828b3697..0d6644e9070b7 100644 --- a/src/sources/opentelemetry/integration_tests.rs +++ b/src/sources/opentelemetry/integration_tests.rs @@ -6,12 +6,14 @@ use serde_json::json; use crate::{ config::{log_schema, SourceConfig, SourceContext}, event::EventStatus, - sources::opentelemetry::config::{GrpcConfig, HttpConfig, OpentelemetryConfig, LOGS, METRICS, TRACES}, + sources::opentelemetry::config::{ + GrpcConfig, HttpConfig, OpentelemetryConfig, LOGS, METRICS, TRACES, + }, test_util::{ collect_n, components::{assert_source_compliance, SOURCE_TAGS}, retry_until, wait_for_tcp, - } + }, }; use prost::Message;