diff --git a/changelog.d/otlp_decoding.enhancement.md b/changelog.d/otlp_decoding.enhancement.md new file mode 100644 index 0000000000000..337671779505d --- /dev/null +++ b/changelog.d/otlp_decoding.enhancement.md @@ -0,0 +1,8 @@ +The `opentelemetry` source now supports a new decoding mode which can be enabled by setting `use_otlp_decoding` to `true`. In this mode, +all events preserve the [OTLP](https://opentelemetry.io/docs/specs/otel/protocol/) format. These events can be forwarded directly to +the `opentelemetry` sink without modifications. + +**Note:** The OTLP metric format and the Vector metric format differ, so the `opentelemetry` source emits OTLP formatted metrics as Vector log +events. These events cannot be used with existing metrics transforms. However, they can be ingested by the OTEL collectors as metrics. + +authors: pront diff --git a/lib/codecs/src/decoding/format/mod.rs b/lib/codecs/src/decoding/format/mod.rs index 9e2dee7de1ce2..783aa5d79d19d 100644 --- a/lib/codecs/src/decoding/format/mod.rs +++ b/lib/codecs/src/decoding/format/mod.rs @@ -44,11 +44,18 @@ pub trait Deserializer: DynClone + Send + Sync { /// frame can potentially hold multiple events, e.g. when parsing a JSON /// array. However, we optimize the most common case of emitting one event /// by not requiring heap allocations for it. + /// + /// **Note**: The type of the produced events depends on the implementation. fn parse( &self, bytes: Bytes, log_namespace: LogNamespace, ) -> vector_common::Result>; + + /// Parses trace events from bytes. + fn parse_traces(&self, _bytes: Bytes) -> vector_common::Result> { + unimplemented!() + } } dyn_clone::clone_trait_object!(Deserializer); diff --git a/lib/codecs/src/decoding/format/protobuf.rs b/lib/codecs/src/decoding/format/protobuf.rs index d30ef25068d0b..beece2053a795 100644 --- a/lib/codecs/src/decoding/format/protobuf.rs +++ b/lib/codecs/src/decoding/format/protobuf.rs @@ -6,17 +6,15 @@ use derivative::Derivative; use prost_reflect::{DynamicMessage, MessageDescriptor}; use smallvec::{SmallVec, smallvec}; use vector_config::configurable_component; -use vector_core::event::LogEvent; +use vector_core::event::{LogEvent, TraceEvent}; use vector_core::{ config::{DataType, LogNamespace, log_schema}, event::Event, schema, }; -use vrl::protobuf::{ - descriptor::get_message_descriptor, - parse::{Options, proto_to_value}, -}; -use vrl::value::Kind; +use vrl::protobuf::descriptor::{get_message_descriptor, get_message_descriptor_from_bytes}; +use vrl::protobuf::parse::{Options, proto_to_value}; +use vrl::value::{Kind, Value}; use super::Deserializer; @@ -86,30 +84,56 @@ pub struct ProtobufDeserializerOptions { #[derive(Debug, Clone)] pub struct ProtobufDeserializer { message_descriptor: MessageDescriptor, + options: Options, } impl ProtobufDeserializer { /// Creates a new `ProtobufDeserializer`. pub fn new(message_descriptor: MessageDescriptor) -> Self { - Self { message_descriptor } + Self { + message_descriptor, + options: Default::default(), + } + } + + /// Creates a new deserializer instance using the descriptor bytes directly. + pub fn new_from_bytes( + desc_bytes: &[u8], + message_type: &str, + options: Options, + ) -> vector_common::Result { + let message_descriptor = get_message_descriptor_from_bytes(desc_bytes, message_type)?; + Ok(Self { + message_descriptor, + options, + }) } } +fn extract_vrl_value( + bytes: Bytes, + message_descriptor: &MessageDescriptor, + options: &Options, +) -> vector_common::Result { + let dynamic_message = DynamicMessage::decode(message_descriptor.clone(), bytes) + .map_err(|error| format!("Error parsing protobuf: {error:?}"))?; + + Ok(proto_to_value( + &prost_reflect::Value::Message(dynamic_message), + None, + options, + )?) +} + impl Deserializer for ProtobufDeserializer { fn parse( &self, bytes: Bytes, log_namespace: LogNamespace, ) -> vector_common::Result> { - let dynamic_message = DynamicMessage::decode(self.message_descriptor.clone(), bytes) - .map_err(|error| format!("Error parsing protobuf: {error:?}"))?; - - let proto_vrl = proto_to_value( - &prost_reflect::Value::Message(dynamic_message), - None, - &Options::default(), - )?; - let mut event = Event::Log(LogEvent::from(proto_vrl)); + let vrl_value = extract_vrl_value(bytes, &self.message_descriptor, &self.options)?; + let mut event = Event::Log(LogEvent::from(vrl_value)); + let event = match log_namespace { LogNamespace::Vector => event, LogNamespace::Legacy => { @@ -126,6 +150,12 @@ impl Deserializer for ProtobufDeserializer { Ok(smallvec![event]) } + + fn parse_traces(&self, bytes: Bytes) -> vector_common::Result> { + let vrl_value = extract_vrl_value(bytes, &self.message_descriptor, &self.options)?; + let trace_event = Event::Trace(TraceEvent::from(vrl_value)); + Ok(smallvec![trace_event]) + } } impl TryFrom<&ProtobufDeserializerConfig> for ProtobufDeserializer { diff --git a/lib/opentelemetry-proto/build.rs b/lib/opentelemetry-proto/build.rs index 19fc3d533bbc7..7e5fcb5f40279 100644 --- a/lib/opentelemetry-proto/build.rs +++ b/lib/opentelemetry-proto/build.rs @@ -1,5 +1,7 @@ use glob::glob; -use std::{env, io::Result, path::PathBuf}; +use std::fs::{read_to_string, write}; +use std::path::Path; +use std::{io::Result, path::PathBuf}; fn main() -> Result<()> { let proto_root = PathBuf::from("src/proto/opentelemetry-proto"); @@ -10,12 +12,7 @@ fn main() -> Result<()> { .filter_map(|result| result.ok()) .collect(); - // Set up re-run triggers - for proto in &proto_paths { - println!("cargo:rerun-if-changed={}", proto.display()); - } - - let out_dir = PathBuf::from(env::var("OUT_DIR").unwrap()); + let out_dir = PathBuf::from(std::env::var("OUT_DIR").unwrap()); let descriptor_path = out_dir.join("opentelemetry-proto.desc"); tonic_build::configure() @@ -24,5 +21,22 @@ fn main() -> Result<()> { .file_descriptor_set_path(&descriptor_path) .compile(&proto_paths, &[include_path])?; + write_static_descriptor_reference(&descriptor_path, &out_dir)?; + + Ok(()) +} + +fn write_static_descriptor_reference(descriptor_path: &Path, out_dir: &Path) -> Result<()> { + let include_line = format!( + "pub static DESCRIPTOR_BYTES: &[u8] = include_bytes!(r\"{}\");\n", + descriptor_path.display() + ); + + let include_file = out_dir.join("opentelemetry-proto.rs"); + let existing = read_to_string(&include_file).ok(); + if existing.as_deref() != Some(&include_line) { + write(&include_file, include_line)?; + } + Ok(()) } diff --git a/lib/opentelemetry-proto/src/proto.rs b/lib/opentelemetry-proto/src/proto.rs index 4c2b1acf340c6..5559113bd14db 100644 --- a/lib/opentelemetry-proto/src/proto.rs +++ b/lib/opentelemetry-proto/src/proto.rs @@ -51,3 +51,6 @@ pub mod resource { tonic::include_proto!("opentelemetry.proto.resource.v1"); } } + +/// The raw descriptor bytes for all the above. +include!(concat!(env!("OUT_DIR"), "/opentelemetry-proto.rs")); diff --git a/lib/vector-core/src/event/trace.rs b/lib/vector-core/src/event/trace.rs index 4adabefccf857..276bac75668b5 100644 --- a/lib/vector-core/src/event/trace.rs +++ b/lib/vector-core/src/event/trace.rs @@ -118,6 +118,13 @@ impl TraceEvent { } } +impl From for TraceEvent { + fn from(value: Value) -> Self { + let log_event = LogEvent::from(value); + Self(log_event) + } +} + impl From for TraceEvent { fn from(log: LogEvent) -> Self { Self(log) diff --git a/scripts/e2e/opentelemetry-logs/compose.yaml b/scripts/e2e/opentelemetry-logs/compose.yaml index c82b2e8a569d8..d721abf6e7aa7 100644 --- a/scripts/e2e/opentelemetry-logs/compose.yaml +++ b/scripts/e2e/opentelemetry-logs/compose.yaml @@ -2,7 +2,7 @@ name: opentelemetry-vector-e2e services: otel-collector-source: container_name: otel-collector-source - image: otel/opentelemetry-collector-contrib:${CONFIG_COLLECTOR_VERSION:-latest} + image: otel/opentelemetry-collector-contrib:${CONFIG_COLLECTOR_VERSION} init: true volumes: - type: bind @@ -33,7 +33,7 @@ services: otel-collector-sink: container_name: otel-collector-sink - image: otel/opentelemetry-collector-contrib:${CONFIG_COLLECTOR_VERSION:-latest} + image: otel/opentelemetry-collector-contrib:${CONFIG_COLLECTOR_VERSION} init: true volumes: - type: bind @@ -57,7 +57,7 @@ services: init: true volumes: - type: bind - source: ../../../tests/data/e2e/opentelemetry/logs/vector.yaml + source: ../../../tests/data/e2e/opentelemetry/logs/${CONFIG_VECTOR_CONFIG} target: /etc/vector/vector.yaml read_only: true - type: bind diff --git a/scripts/e2e/opentelemetry-logs/test.yaml b/scripts/e2e/opentelemetry-logs/test.yaml index 70e3862ae3347..4063e7cc1214a 100644 --- a/scripts/e2e/opentelemetry-logs/test.yaml +++ b/scripts/e2e/opentelemetry-logs/test.yaml @@ -1,5 +1,5 @@ features: -- e2e-tests-opentelemetry + - e2e-tests-opentelemetry test: "e2e" @@ -14,6 +14,7 @@ runner: matrix: # Determines which `otel/opentelemetry-collector-contrib` version to use collector_version: [ 'latest' ] + vector_config: [ 'vector_default.yaml', 'vector_otlp.yaml' ] # Only trigger this integration test if relevant OTEL source/sink files change paths: diff --git a/src/sources/opentelemetry/config.rs b/src/sources/opentelemetry/config.rs index b3c2ab3a38686..3c58bcdc33e51 100644 --- a/src/sources/opentelemetry/config.rs +++ b/src/sources/opentelemetry/config.rs @@ -1,11 +1,10 @@ -use std::net::SocketAddr; - use futures::FutureExt; use futures_util::{TryFutureExt, future::join}; +use std::net::SocketAddr; use tonic::{codec::CompressionEncoding, transport::server::RoutesBuilder}; - use vector_lib::{ + codecs::decoding::ProtobufDeserializer, config::{LegacyKey, LogNamespace, log_schema}, configurable::configurable_component, internal_event::{BytesReceived, EventsReceived, Protocol}, @@ -24,6 +23,7 @@ use vector_lib::{ schema::Definition, tls::{MaybeTlsSettings, TlsEnableableConfig}, }; +use vrl::protobuf::parse::Options; use crate::{ config::{ @@ -49,6 +49,13 @@ pub const LOGS: &str = "logs"; pub const METRICS: &str = "metrics"; pub const TRACES: &str = "traces"; +pub const OTEL_PROTO_LOGS_REQUEST: &str = + "opentelemetry.proto.collector.logs.v1.ExportLogsServiceRequest"; +pub const OTEL_PROTO_TRACES_REQUEST: &str = + "opentelemetry.proto.collector.trace.v1.ExportTraceServiceRequest"; +pub const OTEL_PROTO_METRICS_REQUEST: &str = + "opentelemetry.proto.collector.metrics.v1.ExportMetricsServiceRequest"; + /// Configuration for the `opentelemetry` source. #[configurable_component(source("opentelemetry", "Receive OTLP data through gRPC or HTTP."))] #[derive(Clone, Debug)] @@ -68,6 +75,15 @@ pub struct OpentelemetryConfig { #[configurable(metadata(docs::hidden))] #[serde(default)] pub log_namespace: Option, + + /// Setting this field will override the legacy mapping of OTEL protos to Vector events and use the proto directly. + /// + /// One major caveat here is that the incoming metrics will be parsed as logs but they will preserve the OTLP format. + /// This means that components that work on metrics, will not be compatible with this output. + /// However, these events can be forwarded directly to a downstream OTEL collector. + #[configurable(derived)] + #[serde(default)] + pub use_otlp_decoding: bool, } /// Configuration for the `opentelemetry` gRPC server. @@ -145,11 +161,32 @@ impl GenerateConfig for OpentelemetryConfig { http: example_http_config(), acknowledgements: Default::default(), log_namespace: None, + use_otlp_decoding: false, }) .unwrap() } } +impl OpentelemetryConfig { + fn get_deserializer( + &self, + message_type: &str, + ) -> vector_common::Result> { + if self.use_otlp_decoding { + let deserializer = ProtobufDeserializer::new_from_bytes( + vector_lib::opentelemetry::proto::DESCRIPTOR_BYTES, + message_type, + Options { + use_json_names: true, + }, + )?; + Ok(Some(deserializer)) + } else { + Ok(None) + } + } +} + #[async_trait::async_trait] #[typetag::serde(name = "opentelemetry")] impl SourceConfig for OpentelemetryConfig { @@ -160,29 +197,35 @@ impl SourceConfig for OpentelemetryConfig { let grpc_tls_settings = MaybeTlsSettings::from_config(self.grpc.tls.as_ref(), true)?; + let log_deserializer = self.get_deserializer(OTEL_PROTO_LOGS_REQUEST)?; let log_service = LogsServiceServer::new(Service { pipeline: cx.out.clone(), acknowledgements, log_namespace, events_received: events_received.clone(), + deserializer: log_deserializer.clone(), }) .accept_compressed(CompressionEncoding::Gzip) .max_decoding_message_size(usize::MAX); - let trace_service = TraceServiceServer::new(Service { + let metric_deserializer = self.get_deserializer(OTEL_PROTO_METRICS_REQUEST)?; + let metrics_service = MetricsServiceServer::new(Service { pipeline: cx.out.clone(), acknowledgements, log_namespace, events_received: events_received.clone(), + deserializer: metric_deserializer, }) .accept_compressed(CompressionEncoding::Gzip) .max_decoding_message_size(usize::MAX); - let metrics_service = MetricsServiceServer::new(Service { + let trace_deserializer = self.get_deserializer(OTEL_PROTO_TRACES_REQUEST)?; + let trace_service = TraceServiceServer::new(Service { pipeline: cx.out.clone(), acknowledgements, log_namespace, events_received: events_received.clone(), + deserializer: trace_deserializer, }) .accept_compressed(CompressionEncoding::Gzip) .max_decoding_message_size(usize::MAX); @@ -192,6 +235,7 @@ impl SourceConfig for OpentelemetryConfig { .add_service(log_service) .add_service(metrics_service) .add_service(trace_service); + let grpc_source = run_grpc_server_with_routes( self.grpc.address, grpc_tls_settings, @@ -207,6 +251,7 @@ impl SourceConfig for OpentelemetryConfig { let bytes_received = register!(BytesReceived::from(Protocol::from(protocol))); let headers = build_param_matcher(&remove_duplicates(self.http.headers.clone(), "headers"))?; + let filters = build_warp_filter( acknowledgements, log_namespace, @@ -214,7 +259,9 @@ impl SourceConfig for OpentelemetryConfig { bytes_received, events_received, headers, + log_deserializer, ); + let http_source = run_http_server( self.http.address, http_tls_settings, @@ -316,9 +363,14 @@ impl SourceConfig for OpentelemetryConfig { } }; + let metrics_output = if self.use_otlp_decoding { + SourceOutput::new_maybe_logs(DataType::Log, Definition::any()).with_port(METRICS) + } else { + SourceOutput::new_metrics().with_port(METRICS) + }; vec![ SourceOutput::new_maybe_logs(DataType::Log, schema_definition).with_port(LOGS), - SourceOutput::new_metrics().with_port(METRICS), + metrics_output, SourceOutput::new_traces().with_port(TRACES), ] } diff --git a/src/sources/opentelemetry/grpc.rs b/src/sources/opentelemetry/grpc.rs index 463471626a1c0..4d0554ba271d1 100644 --- a/src/sources/opentelemetry/grpc.rs +++ b/src/sources/opentelemetry/grpc.rs @@ -1,5 +1,14 @@ +use crate::sources::opentelemetry::config::METRICS; +use crate::{ + SourceSender, + internal_events::{EventsReceived, StreamClosedError}, + sources::opentelemetry::config::{LOGS, TRACES}, +}; use futures::TryFutureExt; +use prost::Message; use tonic::{Request, Response, Status}; +use vector_lib::codecs::decoding::ProtobufDeserializer; +use vector_lib::codecs::decoding::format::Deserializer; use vector_lib::internal_event::{CountByteSize, InternalEventHandle as _, Registered}; use vector_lib::opentelemetry::proto::collector::{ logs::v1::{ @@ -19,18 +28,13 @@ use vector_lib::{ event::{BatchNotifier, BatchStatus, BatchStatusReceiver, Event}, }; -use crate::{ - SourceSender, - internal_events::{EventsReceived, StreamClosedError}, - sources::opentelemetry::config::{LOGS, METRICS, TRACES}, -}; - #[derive(Clone)] pub(super) struct Service { pub pipeline: SourceSender, pub acknowledgements: bool, pub events_received: Registered, pub log_namespace: LogNamespace, + pub deserializer: Option, } #[tonic::async_trait] @@ -39,12 +43,21 @@ impl TraceService for Service { &self, request: Request, ) -> Result, Status> { - let events: Vec = request - .into_inner() - .resource_spans - .into_iter() - .flat_map(|v| v.into_event_iter()) - .collect(); + let events = if let Some(deserializer) = self.deserializer.as_ref() { + let raw_bytes = request.get_ref().encode_to_vec(); + let bytes = bytes::Bytes::from(raw_bytes); + deserializer + .parse_traces(bytes) + .map_err(|e| Status::invalid_argument(e.to_string())) + .map(|buf| buf.into_vec())? + } else { + request + .into_inner() + .resource_spans + .into_iter() + .flat_map(|v| v.into_event_iter()) + .collect() + }; self.handle_events(events, TRACES).await?; Ok(Response::new(ExportTraceServiceResponse { @@ -59,12 +72,21 @@ impl LogsService for Service { &self, request: Request, ) -> Result, Status> { - let events: Vec = request - .into_inner() - .resource_logs - .into_iter() - .flat_map(|v| v.into_event_iter(self.log_namespace)) - .collect(); + let events = if let Some(deserializer) = self.deserializer.as_ref() { + let raw_bytes = request.get_ref().encode_to_vec(); + let bytes = bytes::Bytes::from(raw_bytes); + deserializer + .parse(bytes, self.log_namespace) + .map_err(|e| Status::invalid_argument(e.to_string())) + .map(|buf| buf.into_vec())? + } else { + request + .into_inner() + .resource_logs + .into_iter() + .flat_map(|v| v.into_event_iter(self.log_namespace)) + .collect() + }; self.handle_events(events, LOGS).await?; Ok(Response::new(ExportLogsServiceResponse { @@ -79,12 +101,23 @@ impl MetricsService for Service { &self, request: Request, ) -> Result, Status> { - let events: Vec = request - .into_inner() - .resource_metrics - .into_iter() - .flat_map(|v| v.into_event_iter()) - .collect(); + let events = if let Some(deserializer) = self.deserializer.as_ref() { + let raw_bytes = request.get_ref().encode_to_vec(); + // Major caveat here, the output event will be logs. + let bytes = bytes::Bytes::from(raw_bytes); + deserializer + .parse(bytes, self.log_namespace) + .map_err(|e| Status::invalid_argument(e.to_string())) + .map(|buf| buf.into_vec())? + } else { + request + .into_inner() + .resource_metrics + .into_iter() + .flat_map(|v| v.into_event_iter()) + .collect() + }; + self.handle_events(events, METRICS).await?; Ok(Response::new(ExportMetricsServiceResponse { diff --git a/src/sources/opentelemetry/http.rs b/src/sources/opentelemetry/http.rs index c878cdee2c24f..db5fc9752b241 100644 --- a/src/sources/opentelemetry/http.rs +++ b/src/sources/opentelemetry/http.rs @@ -1,6 +1,21 @@ use std::time::Duration; use std::{convert::Infallible, net::SocketAddr}; +use super::{reply::protobuf, status::Status}; +use crate::common::http::ErrorMessage; +use crate::http::{KeepaliveConfig, MaxConnectionAgeLayer}; +use crate::sources::http_server::HttpConfigParamKind; +use crate::sources::opentelemetry::config::{LOGS, METRICS, OpentelemetryConfig, TRACES}; +use crate::sources::util::add_headers; +use crate::{ + SourceSender, + event::Event, + http::build_http_trace_layer, + internal_events::{EventsReceived, StreamClosedError}, + shutdown::ShutdownSignal, + sources::util::decode, + tls::MaybeTlsSettings, +}; use bytes::Bytes; use futures_util::FutureExt; use http::StatusCode; @@ -10,13 +25,16 @@ use snafu::Snafu; use tokio::net::TcpStream; use tower::ServiceBuilder; use tracing::Span; +use vector_lib::codecs::decoding::ProtobufDeserializer; +use vector_lib::codecs::decoding::format::Deserializer; use vector_lib::internal_event::{ ByteSize, BytesReceived, CountByteSize, InternalEventHandle as _, Registered, }; +use vector_lib::opentelemetry::proto::collector::trace::v1::ExportTraceServiceResponse; use vector_lib::opentelemetry::proto::collector::{ logs::v1::{ExportLogsServiceRequest, ExportLogsServiceResponse}, metrics::v1::{ExportMetricsServiceRequest, ExportMetricsServiceResponse}, - trace::v1::{ExportTraceServiceRequest, ExportTraceServiceResponse}, + trace::v1::ExportTraceServiceRequest, }; use vector_lib::tls::MaybeTlsIncomingStream; use vector_lib::{ @@ -28,23 +46,6 @@ use warp::{ Filter, Reply, filters::BoxedFilter, http::HeaderMap, reject::Rejection, reply::Response, }; -use crate::common::http::ErrorMessage; -use crate::http::{KeepaliveConfig, MaxConnectionAgeLayer}; -use crate::sources::http_server::HttpConfigParamKind; -use crate::sources::util::add_headers; -use crate::{ - SourceSender, - event::Event, - http::build_http_trace_layer, - internal_events::{EventsReceived, StreamClosedError}, - shutdown::ShutdownSignal, - sources::util::decode, - tls::MaybeTlsSettings, -}; - -use super::{reply::protobuf, status::Status}; -use crate::sources::opentelemetry::config::{LOGS, METRICS, OpentelemetryConfig, TRACES}; - #[derive(Clone, Copy, Debug, Snafu)] pub(crate) enum ApiError { ServerShutdown, @@ -94,6 +95,7 @@ pub(crate) fn build_warp_filter( bytes_received: Registered, events_received: Registered, headers: Vec, + deserializer: Option, ) -> BoxedFilter<(Response,)> { let log_filters = build_warp_log_filter( acknowledgements, @@ -102,18 +104,21 @@ pub(crate) fn build_warp_filter( bytes_received.clone(), events_received.clone(), headers.clone(), + deserializer.clone(), ); let metrics_filters = build_warp_metrics_filter( acknowledgements, out.clone(), bytes_received.clone(), events_received.clone(), + deserializer.clone(), ); let trace_filters = build_warp_trace_filter( acknowledgements, out.clone(), bytes_received, events_received, + deserializer, ); log_filters .or(trace_filters) @@ -138,16 +143,24 @@ fn enrich_events( ); } -fn build_warp_log_filter( +fn build_ingest_filter( + telemetry_type: &'static str, acknowledgements: bool, - log_namespace: LogNamespace, out: SourceSender, - bytes_received: Registered, - events_received: Registered, - headers: Vec, -) -> BoxedFilter<(Response,)> { + make_events: F, +) -> BoxedFilter<(Response,)> +where + Resp: prost::Message + Default + Send + 'static, + F: Clone + + Send + + Sync + + 'static + + Fn(Option, HeaderMap, Bytes) -> Result, ErrorMessage>, +{ warp::post() - .and(warp::path!("v1" / "logs")) + .and(warp::path("v1")) + .and(warp::path(telemetry_type)) + .and(warp::path::end()) .and(warp::header::exact_ignore_case( "content-type", "application/x-protobuf", @@ -156,89 +169,108 @@ fn build_warp_log_filter( .and(warp::header::headers_cloned()) .and(warp::body::bytes()) .and_then( - move |encoding_header: Option, headers_config: HeaderMap, body: Bytes| { - let events = decode(encoding_header.as_deref(), body) - .and_then(|body| { - bytes_received.emit(ByteSize(body.len())); - decode_log_body(body, log_namespace, &events_received) - }) - .map(|mut events| { - enrich_events(&mut events, &headers, &headers_config, log_namespace); - events - }); - + move |encoding_header: Option, headers: HeaderMap, body: Bytes| { + let events = make_events(encoding_header, headers, body); handle_request( events, acknowledgements, out.clone(), - LOGS, - ExportLogsServiceResponse::default(), + telemetry_type, + Resp::default(), ) }, ) .boxed() } +fn build_warp_log_filter( + acknowledgements: bool, + log_namespace: LogNamespace, + source_sender: SourceSender, + bytes_received: Registered, + events_received: Registered, + headers_cfg: Vec, + deserializer: Option, +) -> BoxedFilter<(Response,)> { + let make_events = move |encoding_header: Option, headers: HeaderMap, body: Bytes| { + if let Some(d) = deserializer.as_ref() { + d.parse(body, log_namespace) + .map(|r| r.into_vec()) + .map_err(|e| ErrorMessage::new(StatusCode::BAD_REQUEST, e.to_string())) + } else { + decode(encoding_header.as_deref(), body) + .and_then(|body| { + bytes_received.emit(ByteSize(body.len())); + decode_log_body(body, log_namespace, &events_received) + }) + .map(|mut events| { + enrich_events(&mut events, &headers_cfg, &headers, log_namespace); + events + }) + } + }; + + build_ingest_filter::( + LOGS, + acknowledgements, + source_sender, + make_events, + ) +} fn build_warp_metrics_filter( acknowledgements: bool, - out: SourceSender, + source_sender: SourceSender, bytes_received: Registered, events_received: Registered, + deserializer: Option, ) -> BoxedFilter<(Response,)> { - warp::post() - .and(warp::path!("v1" / "metrics")) - .and(warp::header::exact_ignore_case( - "content-type", - "application/x-protobuf", - )) - .and(warp::header::optional::("content-encoding")) - .and(warp::body::bytes()) - .and_then(move |encoding_header: Option, body: Bytes| { - let events = decode(encoding_header.as_deref(), body).and_then(|body| { + let make_events = move |encoding_header: Option, _headers: HeaderMap, body: Bytes| { + if let Some(d) = deserializer.as_ref() { + d.parse(body, LogNamespace::default()) + .map(|r| r.into_vec()) + .map_err(|e| ErrorMessage::new(StatusCode::BAD_REQUEST, e.to_string())) + } else { + decode(encoding_header.as_deref(), body).and_then(|body| { bytes_received.emit(ByteSize(body.len())); decode_metrics_body(body, &events_received) - }); - - handle_request( - events, - acknowledgements, - out.clone(), - METRICS, - ExportMetricsServiceResponse::default(), - ) - }) - .boxed() + }) + } + }; + + build_ingest_filter::( + METRICS, + acknowledgements, + source_sender, + make_events, + ) } fn build_warp_trace_filter( acknowledgements: bool, - out: SourceSender, + source_sender: SourceSender, bytes_received: Registered, events_received: Registered, + deserializer: Option, ) -> BoxedFilter<(Response,)> { - warp::post() - .and(warp::path!("v1" / "traces")) - .and(warp::header::exact_ignore_case( - "content-type", - "application/x-protobuf", - )) - .and(warp::header::optional::("content-encoding")) - .and(warp::body::bytes()) - .and_then(move |encoding_header: Option, body: Bytes| { - let events = decode(encoding_header.as_deref(), body).and_then(|body| { + let make_events = move |encoding_header: Option, _headers: HeaderMap, body: Bytes| { + if let Some(d) = deserializer.as_ref() { + d.parse_traces(body) + .map(|r| r.into_vec()) + .map_err(|e| ErrorMessage::new(StatusCode::BAD_REQUEST, e.to_string())) + } else { + decode(encoding_header.as_deref(), body).and_then(|body| { bytes_received.emit(ByteSize(body.len())); decode_trace_body(body, &events_received) - }); - - handle_request( - events, - acknowledgements, - out.clone(), - TRACES, - ExportTraceServiceResponse::default(), - ) - }) - .boxed() + }) + } + }; + + build_ingest_filter::( + TRACES, + acknowledgements, + source_sender, + make_events, + ) } fn decode_trace_body( diff --git a/src/sources/opentelemetry/integration_tests.rs b/src/sources/opentelemetry/integration_tests.rs index d45fa5f51778b..d0af9142ad733 100644 --- a/src/sources/opentelemetry/integration_tests.rs +++ b/src/sources/opentelemetry/integration_tests.rs @@ -63,6 +63,7 @@ async fn receive_logs_legacy_namespace() { }, acknowledgements: Default::default(), log_namespace: Default::default(), + use_otlp_decoding: false, }; let (sender, logs_output, _) = new_source(EventStatus::Delivered, LOGS.to_string()); @@ -161,6 +162,7 @@ async fn receive_trace() { }, acknowledgements: Default::default(), log_namespace: Default::default(), + use_otlp_decoding: false, }; let (sender, trace_output, _) = new_source(EventStatus::Delivered, TRACES.to_string()); @@ -265,6 +267,7 @@ async fn receive_metric() { }, acknowledgements: Default::default(), log_namespace: Default::default(), + use_otlp_decoding: false, }; let (sender, metrics_output, _) = new_source(EventStatus::Delivered, METRICS.to_string()); diff --git a/src/sources/opentelemetry/tests.rs b/src/sources/opentelemetry/tests.rs index aaf4126b81469..69180c683a802 100644 --- a/src/sources/opentelemetry/tests.rs +++ b/src/sources/opentelemetry/tests.rs @@ -207,7 +207,8 @@ async fn receive_grpc_logs_legacy_namespace() { .config .outputs(LogNamespace::Legacy) .remove(0) - .schema_definition(true); + .schema_definition(true) + .unwrap(); // send request via grpc client let mut client = LogsServiceClient::connect(format!("http://{}", env.grpc_addr)) @@ -219,9 +220,7 @@ async fn receive_grpc_logs_legacy_namespace() { // we just send one, so only one output assert_eq!(output.len(), 1); let actual_event = output.pop().unwrap(); - schema_definitions - .unwrap() - .assert_valid_for_event(&actual_event); + schema_definitions.assert_valid_for_event(&actual_event); let expect_vec = vec_into_btmap(vec![ ( "attributes", @@ -1089,6 +1088,7 @@ async fn http_headers() { }, acknowledgements: Default::default(), log_namespace: Default::default(), + use_otlp_decoding: false, }; let schema_definitions = source .outputs(LogNamespace::Legacy) @@ -1194,6 +1194,7 @@ pub async fn build_otlp_test_env( }, acknowledgements: Default::default(), log_namespace, + use_otlp_decoding: false, }; let (sender, output, _) = new_source(EventStatus::Delivered, event_name.to_string()); diff --git a/src/sources/util/http/prelude.rs b/src/sources/util/http/prelude.rs index ddd4085901829..a589940b487be 100644 --- a/src/sources/util/http/prelude.rs +++ b/src/sources/util/http/prelude.rs @@ -129,7 +129,6 @@ pub trait HttpSource: Clone + Send + Sync + 'static { addr: Option| { debug!(message = "Handling HTTP request.", headers = ?headers); let http_path = path.as_str(); - let events = auth_matcher .as_ref() .map_or(Ok(()), |a| { diff --git a/tests/data/e2e/opentelemetry/logs/vector.yaml b/tests/data/e2e/opentelemetry/logs/vector_default.yaml similarity index 100% rename from tests/data/e2e/opentelemetry/logs/vector.yaml rename to tests/data/e2e/opentelemetry/logs/vector_default.yaml diff --git a/tests/data/e2e/opentelemetry/logs/vector_oltp.yaml b/tests/data/e2e/opentelemetry/logs/vector_oltp.yaml new file mode 100644 index 0000000000000..a3b98647059c5 --- /dev/null +++ b/tests/data/e2e/opentelemetry/logs/vector_oltp.yaml @@ -0,0 +1,49 @@ +sources: + source0: + type: opentelemetry + grpc: + address: 0.0.0.0:4317 + http: + address: 0.0.0.0:4318 + keepalive: + max_connection_age_jitter_factor: 0.1 + max_connection_age_secs: 300 + use_otlp_decoding: true + + internal_metrics: + type: internal_metrics + scrape_interval_secs: 60 + +sinks: + otel_sink: + inputs: + - source0.logs + type: opentelemetry + protocol: + type: http + uri: http://otel-collector-sink:5318/v1/logs + method: post + encoding: + codec: json + framing: + method: newline_delimited + batch: + max_events: 1 + request: + headers: + content-type: application/json + + otel_file_sink: + type: file + path: "/output/vector-file-sink.log" + inputs: + - source0.logs + encoding: + codec: json + + metrics_console_sink: + type: console + inputs: + - internal_metrics + encoding: + codec: json diff --git a/website/cue/reference/components/sources/generated/opentelemetry.cue b/website/cue/reference/components/sources/generated/opentelemetry.cue index 2ab191d1d6fdb..4791cc458c8cf 100644 --- a/website/cue/reference/components/sources/generated/opentelemetry.cue +++ b/website/cue/reference/components/sources/generated/opentelemetry.cue @@ -325,4 +325,15 @@ generated: components: sources: opentelemetry: configuration: { } } } + use_otlp_decoding: { + description: """ + Setting this field will override the legacy mapping of OTEL protos to Vector events and use the proto directly. + + One major caveat here is that the incoming metrics will be parsed as logs but they will preserve the OTLP format. + This means that components that work on metrics, will not be compatible with this output. + However, these events can be forwarded directly to a downstream OTEL collector. + """ + required: false + type: bool: default: false + } }