Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
41 commits
Select commit Hold shift + click to select a range
aa1e605
wip
pront Sep 24, 2025
fde5401
ran cargo fmt
pront Sep 24, 2025
94078dc
changelog
pront Sep 25, 2025
f161cbe
linting
pront Sep 25, 2025
ff38f52
generate component docs
pront Sep 25, 2025
f581f51
Merge remote-tracking branch 'origin/master' into pront-otlp-encoding
pront Sep 25, 2025
8a8501d
Merge branch 'master' into pront-otlp-encoding
pront Sep 26, 2025
ce91543
Merge remote-tracking branch 'origin' into pront-otlp-encoding
pront Oct 1, 2025
65010d9
fmt
pront Oct 2, 2025
5fa6aee
reverting most changes, will go with a codec approach
pront Oct 3, 2025
250feb1
otlp codec WIP
pront Oct 3, 2025
36e617e
Merge remote-tracking branch 'origin/master' into pront-otlp-encoding
pront Oct 3, 2025
65b9baf
make generate-component-docs
pront Oct 3, 2025
6ea1062
forward serializer options
pront Oct 3, 2025
2ee8762
ran cargo fmt
pront Oct 3, 2025
b8ad47a
update e2e tests to use the new codec
pront Oct 3, 2025
0a75c4a
automatically set content header
pront Oct 3, 2025
3516688
automatically set content header
pront Oct 3, 2025
50d380a
fix aggressive utilization reporting
pront Oct 3, 2025
f995377
Merge remote-tracking branch 'origin/master' into pront-otlp-encoding
pront Oct 6, 2025
3df19cb
inspect top level field (not event type)
pront Oct 6, 2025
32b366c
ran cargo fmt
pront Oct 6, 2025
6328d6d
Update changelog to reflect OTLP codec implementation
pront Oct 6, 2025
ef3ee94
use json names
pront Oct 6, 2025
e25d5ba
fmt on linux
pront Oct 6, 2025
ff1820c
Merge branch 'master' into pront-otlp-encoding
pront Oct 6, 2025
829cb22
update otel source cue
pront Oct 6, 2025
2f6e620
only want JSON names for now
pront Oct 6, 2025
0ec66c0
address some review points
pront Oct 6, 2025
628b7e6
review feedback - feature gate - optional
pront Oct 6, 2025
372c5a2
Merge remote-tracking branch 'origin/master' into pront-otlp-encoding
pront Oct 7, 2025
ecbe9ed
more feature gates to fix failing checks
pront Oct 7, 2025
3872d83
fmt linux
pront Oct 7, 2025
8585288
add feature checks
pront Oct 7, 2025
4741b4a
add codecs-opentelemetry to e2e-tests-opentelemetry
pront Oct 7, 2025
5daa059
also add to source-opentelemetry
pront Oct 7, 2025
93a7d8a
one more feature gate
pront Oct 7, 2025
903e3f0
fmt linux
pront Oct 7, 2025
0891c84
more feature gates
pront Oct 8, 2025
80a0906
fmt linux
pront Oct 8, 2025
5cbebdf
Merge branch 'master' into pront-otlp-encoding
pront Oct 8, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

18 changes: 15 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -577,6 +577,7 @@ enrichment-tables-memory = ["dep:evmap", "dep:evmap-derive", "dep:thread_local"]

# Codecs
codecs-syslog = ["vector-lib/syslog"]
codecs-opentelemetry = ["vector-lib/opentelemetry"]

# Secrets
secrets = ["secrets-aws-secrets-manager"]
Expand Down Expand Up @@ -668,7 +669,17 @@ sources-mqtt = ["dep:rumqttc"]
sources-nats = ["dep:async-nats", "dep:nkeys"]
sources-nginx_metrics = ["dep:nom"]
sources-okta = ["sources-utils-http-client"]
sources-opentelemetry = ["dep:hex", "vector-lib/opentelemetry", "dep:prost", "dep:prost-types", "sources-http_server", "sources-utils-http", "sources-utils-http-headers", "sources-vector"]
sources-opentelemetry = [
"dep:hex",
"codecs-opentelemetry",
"vector-lib/opentelemetry",
"dep:prost",
"dep:prost-types",
"sources-http_server",
"sources-utils-http",
"sources-utils-http-headers",
"sources-vector",
]
sources-postgresql_metrics = ["dep:postgres-openssl", "dep:tokio-postgres"]
sources-prometheus = ["sources-prometheus-scrape", "sources-prometheus-remote-write", "sources-prometheus-pushgateway"]
sources-prometheus-scrape = ["sinks-prometheus", "sources-utils-http-client", "vector-lib/prometheus"]
Expand Down Expand Up @@ -855,7 +866,7 @@ sinks-mqtt = ["dep:rumqttc"]
sinks-nats = ["dep:async-nats", "dep:nkeys"]
sinks-new_relic_logs = ["sinks-http"]
sinks-new_relic = []
sinks-opentelemetry = ["sinks-http"]
sinks-opentelemetry = ["sinks-http", "codecs-opentelemetry"]
sinks-papertrail = ["dep:syslog"]
sinks-prometheus = ["dep:base64", "dep:prost", "vector-lib/prometheus"]
sinks-postgres = ["dep:sqlx"]
Expand Down Expand Up @@ -1004,7 +1015,8 @@ e2e-tests-opentelemetry = [
"sources-internal_metrics",
"transforms-remap",
"sinks-console",
"sinks-file"
"sinks-file",
"codecs-opentelemetry",
]

vector-api-tests = [
Expand Down
4 changes: 4 additions & 0 deletions changelog.d/otlp_encoding.feature.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
Added `otlp` codec for encoding Vector events to OTLP format.
The codec can be used with sinks that support encoding configuration.

authors: pront
2 changes: 2 additions & 0 deletions lib/codecs/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ flate2.workspace = true
influxdb-line-protocol = { version = "2", default-features = false }
lookup = { package = "vector-lookup", path = "../vector-lookup", default-features = false, features = ["test"] }
memchr = { version = "2", default-features = false }
opentelemetry-proto = { path = "../opentelemetry-proto", optional = true }
ordered-float.workspace = true
prost.workspace = true
prost-reflect.workspace = true
Expand Down Expand Up @@ -53,3 +54,4 @@ vrl.workspace = true

[features]
syslog = ["dep:syslog_loose"]
opentelemetry = ["dep:opentelemetry-proto"]
4 changes: 4 additions & 0 deletions lib/codecs/src/encoding/format/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ mod json;
mod logfmt;
mod native;
mod native_json;
#[cfg(feature = "opentelemetry")]
mod otlp;
mod protobuf;
mod raw_message;
mod text;
Expand All @@ -26,6 +28,8 @@ pub use json::{JsonSerializer, JsonSerializerConfig, JsonSerializerOptions};
pub use logfmt::{LogfmtSerializer, LogfmtSerializerConfig};
pub use native::{NativeSerializer, NativeSerializerConfig};
pub use native_json::{NativeJsonSerializer, NativeJsonSerializerConfig};
#[cfg(feature = "opentelemetry")]
pub use otlp::{OtlpSerializer, OtlpSerializerConfig};
pub use protobuf::{ProtobufSerializer, ProtobufSerializerConfig, ProtobufSerializerOptions};
pub use raw_message::{RawMessageSerializer, RawMessageSerializerConfig};
pub use text::{TextSerializer, TextSerializerConfig};
Expand Down
132 changes: 132 additions & 0 deletions lib/codecs/src/encoding/format/otlp.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
use crate::encoding::ProtobufSerializer;
use bytes::BytesMut;
use opentelemetry_proto::proto::{
DESCRIPTOR_BYTES, LOGS_REQUEST_MESSAGE_TYPE, METRICS_REQUEST_MESSAGE_TYPE,
RESOURCE_LOGS_JSON_FIELD, RESOURCE_METRICS_JSON_FIELD, RESOURCE_SPANS_JSON_FIELD,
TRACES_REQUEST_MESSAGE_TYPE,
};
use tokio_util::codec::Encoder;
use vector_config_macros::configurable_component;
use vector_core::{config::DataType, event::Event, schema};
use vrl::protobuf::encode::Options;

/// Config used to build an `OtlpSerializer`.
#[configurable_component]
#[derive(Debug, Clone, Default)]
pub struct OtlpSerializerConfig {
// No configuration options needed - OTLP serialization is opinionated
}

impl OtlpSerializerConfig {
/// Build the `OtlpSerializer` from this configuration.
pub fn build(&self) -> Result<OtlpSerializer, crate::encoding::BuildError> {
OtlpSerializer::new()
}

/// The data type of events that are accepted by `OtlpSerializer`.
pub fn input_type(&self) -> DataType {
DataType::Log | DataType::Trace
}

/// The schema required by the serializer.
pub fn schema_requirement(&self) -> schema::Requirement {
schema::Requirement::empty()
}
}

/// Serializer that converts an `Event` to bytes using the OTLP (OpenTelemetry Protocol) protobuf format.
///
/// This serializer encodes events using the OTLP protobuf specification, which is the recommended
/// encoding format for OpenTelemetry data. The output is suitable for sending to OTLP-compatible
/// endpoints with `content-type: application/x-protobuf`.
///
/// # Implementation approach
///
/// This serializer converts Vector's internal event representation to the appropriate OTLP message type
/// based on the top-level field in the event:
/// - `resourceLogs` → `ExportLogsServiceRequest`
/// - `resourceMetrics` → `ExportMetricsServiceRequest`
/// - `resourceSpans` → `ExportTraceServiceRequest`
///
/// The implementation is the inverse of what the `opentelemetry` source does when decoding,
/// ensuring round-trip compatibility.
#[derive(Debug, Clone)]
#[allow(dead_code)] // Fields will be used once encoding is implemented
pub struct OtlpSerializer {
logs_descriptor: ProtobufSerializer,
metrics_descriptor: ProtobufSerializer,
traces_descriptor: ProtobufSerializer,
options: Options,
}

impl OtlpSerializer {
/// Creates a new OTLP serializer with the appropriate message descriptors.
pub fn new() -> vector_common::Result<Self> {
let options = Options {
use_json_names: true,
};

let logs_descriptor = ProtobufSerializer::new_from_bytes(
DESCRIPTOR_BYTES,
LOGS_REQUEST_MESSAGE_TYPE,
&options,
)?;

let metrics_descriptor = ProtobufSerializer::new_from_bytes(
DESCRIPTOR_BYTES,
METRICS_REQUEST_MESSAGE_TYPE,
&options,
)?;

let traces_descriptor = ProtobufSerializer::new_from_bytes(
DESCRIPTOR_BYTES,
TRACES_REQUEST_MESSAGE_TYPE,
&options,
)?;

Ok(Self {
logs_descriptor,
metrics_descriptor,
traces_descriptor,
options,
})
}
}

impl Encoder<Event> for OtlpSerializer {
type Error = vector_common::Error;

fn encode(&mut self, event: Event, buffer: &mut BytesMut) -> Result<(), Self::Error> {
// Determine which descriptor to use based on top-level OTLP fields
// This handles events that were decoded with use_otlp_decoding enabled
// The deserializer uses use_json_names: true, so fields are in camelCase
match &event {
Event::Log(log) => {
if log.contains(RESOURCE_LOGS_JSON_FIELD) {
self.logs_descriptor.encode(event, buffer)
} else if log.contains(RESOURCE_METRICS_JSON_FIELD) {
// Currently the OTLP metrics are Vector logs (not metrics).
self.metrics_descriptor.encode(event, buffer)
} else {
Err(format!(
"Log event does not contain OTLP top-level fields ({RESOURCE_LOGS_JSON_FIELD} or {RESOURCE_METRICS_JSON_FIELD})",
)
.into())
}
}
Event::Trace(trace) => {
if trace.contains(RESOURCE_SPANS_JSON_FIELD) {
self.traces_descriptor.encode(event, buffer)
} else {
Err(format!(
"Trace event does not contain OTLP top-level field ({RESOURCE_SPANS_JSON_FIELD})",
)
.into())
}
}
Event::Metric(_) => {
Err("OTLP serializer does not support native Vector metrics yet.".into())
}
}
}
}
40 changes: 30 additions & 10 deletions lib/codecs/src/encoding/format/protobuf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,10 @@ use vector_core::{
event::{Event, Value},
schema,
};
use vrl::protobuf::encode::Options;
use vrl::protobuf::{descriptor::get_message_descriptor, encode::encode_message};
use vrl::protobuf::{
descriptor::{get_message_descriptor, get_message_descriptor_from_bytes},
encode::{Options, encode_message},
};

/// Config used to build a `ProtobufSerializer`.
#[configurable_component]
Expand All @@ -26,7 +28,10 @@ impl ProtobufSerializerConfig {
pub fn build(&self) -> Result<ProtobufSerializer, BuildError> {
let message_descriptor =
get_message_descriptor(&self.protobuf.desc_file, &self.protobuf.message_type)?;
Ok(ProtobufSerializer { message_descriptor })
Ok(ProtobufSerializer {
message_descriptor,
options: Options::default(),
})
}

/// The data type of events that are accepted by `ProtobufSerializer`.
Expand Down Expand Up @@ -64,12 +69,29 @@ pub struct ProtobufSerializerOptions {
pub struct ProtobufSerializer {
/// The protobuf message definition to use for serialization.
message_descriptor: MessageDescriptor,
options: Options,
}

impl ProtobufSerializer {
/// Creates a new `ProtobufSerializer`.
pub fn new(message_descriptor: MessageDescriptor) -> Self {
Self { message_descriptor }
Self {
message_descriptor,
options: Options::default(),
}
}

/// Creates a new serializer instance using the descriptor bytes directly.
pub fn new_from_bytes(
desc_bytes: &[u8],
message_type: &str,
options: &Options,
) -> vector_common::Result<Self> {
let message_descriptor = get_message_descriptor_from_bytes(desc_bytes, message_type)?;
Ok(Self {
message_descriptor,
options: options.clone(),
})
}

/// Get a description of the message type used in serialization.
Expand All @@ -83,16 +105,14 @@ impl Encoder<Event> for ProtobufSerializer {

fn encode(&mut self, event: Event, buffer: &mut BytesMut) -> Result<(), Self::Error> {
let message = match event {
Event::Log(log) => encode_message(
&self.message_descriptor,
log.into_parts().0,
&Options::default(),
),
Event::Log(log) => {
encode_message(&self.message_descriptor, log.into_parts().0, &self.options)
}
Event::Metric(_) => unimplemented!(),
Event::Trace(trace) => encode_message(
&self.message_descriptor,
Value::Object(trace.into_parts().0),
&Options::default(),
&self.options,
),
}?;
message.encode(buffer).map_err(Into::into)
Expand Down
Loading
Loading