-
Notifications
You must be signed in to change notification settings - Fork 2k
feat(sink): add opentelemetry metrics support #22550
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
feat(sink): add opentelemetry metrics support #22550
Conversation
|
Hi @brittonhayes, thank you for the PR. Before I dive into it, I think we can eliminate a lot of the boilerplate. Basically, I don't think we want to introduce a new sink here, because the current sink already allows for metric inputs: We do want all the new config bits and encoding though from this PR. Take a look and let me know the refactoring makes sense to you. |
Totally makes sense! I can consolidate into the otel sink and make this way simpler. |
rtrieu
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Left a few non-blocking suggestions.
| The maximum size of a batch that is processed by a sink. | ||
| This is based on the uncompressed size of the batched events, before they are | ||
| serialized/compressed. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| serialized/compressed. | |
| serialized, or compressed. |
| Various settings can be configured, such as concurrency and rate limits, timeouts, and retry behavior. | ||
| Note that the retry backoff policy follows the Fibonacci sequence. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| Note that the retry backoff policy follows the Fibonacci sequence. | |
| **Note**: The retry backoff policy follows the Fibonacci sequence. |
| #[async_trait::async_trait] | ||
| #[typetag::serde(name = "opentelemetry")] | ||
| impl SinkConfig for OpenTelemetryConfig { | ||
| async fn build(&self, cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not really sure what the best approach is here. I might need some advice on how to make a sink that supports being sent logs + metrics. I assume that this build fn is where we handle that, but wasn't sure.
| Ok((sink, healthcheck)) | ||
| } | ||
|
|
||
| fn input(&self) -> Input { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
also not totally sure how this works
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There are several sinks that accept more than one telemetry data type, examples:
- https://github.com/vectordotdev/vector/blob/master/src/sinks/appsignal/config.rs#L140
- (codec specific)
vector/src/sinks/console/config.rs
Line 99 in b70e555
Input::new(self.encoding.config().1.input_type())
You can draw inspiration from how those sinks handle different types before sending them downstream.
| }; | ||
|
|
||
| // Create the final sink | ||
| let sink = if let Some(metrics_sink) = metrics_sink { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe this is the incorrect way to make a sink support logs and/or metrics, so open to any advice here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After my response here, I realize that this is the best example since the current implementation is using this code directly:
- The build implementation: https://github.com/vectordotdev/vector/blob/master/src/sinks/http/config.rs#L240-L336
- Note how the encoder accepts the generic
Eventtype: https://github.com/vectordotdev/vector/blob/master/src/sinks/http/encoder.rs#L49
|
@pront ive consolidated into the OTEL sink and also updated to leverage the otel sdk for encoding. This is very work in progress but would love a review and some direction on how to land this! |
Thank you @brittonhayes, this is a great PR and something that the community frequently requested. I will do a deeper review later this week. |
|
Please note there is a branch conflict and some CI checks are failing. |
|
I took a quick look and this PR looks like a step in the right direction and is a very highly requested improvement. Thanks.
Did you test in a real scenario? E.g. OTEL source accepting metrics, Remap modifying tags, OTEL sink receiving them and pushing them to a collector. |
|
@pront hey! No I haven't currently testing shipping to a real OTEL collector. I've gotten the PR to a stage where I probably need some help on what to do next. If there's some docs I can reference for how to go about building vector from this PR and testing a config to see how it behaves when integrated with a collector that'd definitely be helpful. |
I believe it's all described in this section: https://vector.dev/docs/reference/configuration/sinks/opentelemetry/#quickstart Step 2&3 show how to run the OTEL collector and step 4 shows how to run Vector. Happy to help more if needed. |
fbs
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(I'm not part of the vector team but interested in seeing otel support improve)
Nice work!
| // Convert and record metrics | ||
| self.convert_and_record_metrics(metrics); | ||
|
|
||
| // The SDK handles the export asynchronously, so we just return a success response |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this desirable behaviour for vector? Seems like you loose end to end acks with this. Would it be better to handle the sending in vector itself and only use the SDK for constructing and serializing?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
really good point. I'd like to keep acks and stay consistent with other sinks in terms of shipping flow. Will refine to just use the sdk for serializing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I gave this an initial try and it might take some additional work to sort out.
I believe this would require us to create an OTLP metric exporter that integrates with Vector's built in buffer/acknowledgements/retry functionality
Right now we're making an exporter using their builder method, but we could likely make a dedicated exporter without the builder that is specific to Vector.
/// Today's approach
let exporter = MetricExporter::builder()
.with_http()
.with_endpoint(endpoint)
.with_temporality(Temporality::from(temporality))
.build()
.map_err(|e| crate::Error::from(format!("Failed to build metrics exporter: {}", e)))?;Here's what we'd have to support
/// seen in
/// opentelemetry_otlp::metric
pub struct MetricExporter {
client: Box<dyn MetricsClient>,
temporality: Temporality,
}
impl Debug for MetricExporter {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.debug_struct("MetricExporter").finish()
}
}
#[async_trait]
impl PushMetricExporter for MetricExporter {
async fn export(&self, metrics: &mut ResourceMetrics) -> OTelSdkResult {
self.client.export(metrics).await
}
async fn force_flush(&self) -> OTelSdkResult {
// this component is stateless
Ok(())
}
fn shutdown(&self) -> OTelSdkResult {
self.client.shutdown()
}
fn temporality(&self) -> Temporality {
self.temporality
}
}There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
additional challenge, this is not exported outside the otlp crate, meaning its not quite this easy to make our own exporter metrics client.
#[async_trait]
pub(crate) trait MetricsClient: fmt::Debug + Send + Sync + 'static {
async fn export(&self, metrics: &mut ResourceMetrics) -> OTelSdkResult;
fn shutdown(&self) -> OTelSdkResult;
}There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does using the SDK provide a lot of value? I'm not sure its usecase aligns with vector that well. Trying to integrate it might be more hassle than its worth.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
honestly the more I use it the more it does feel like it's adding more hassle than help.
The benefits of the sdk is built in aggregation of metrics though which is helpful. Other than that, it seems to add a lot of abstraction.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
While working on the logging support i came to the same conclusion. Let me see if I can get my PR open as draft today, maybe we can converge on the same direction for it. I used the gcp stackdriver sink as a starting point for a sink that allows for custom encoding. Not sure if its right direction, adapting the http sink could also work, but maybe we can avoid some double work
| /// | ||
| /// This should be a full URL, including the protocol (e.g. `https://`). | ||
| /// If not specified, metrics will not be sent. | ||
| #[configurable(metadata(docs::examples = "http://localhost:4317/v1/metrics"))] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As an end user to me this would make the sink itself even more confusing. The way it currently works already limits you to either logs or traces on the sink, as you have to set protocol.uri to <addr>/v1/logs or /traces. To control what you grab you can use inputs = <name>.logs.
It would be nice if it can integrate with the existing settigs.
However its probably too much to solve all that in this PR. Maybe for now some nesting can help and have something like:
type: Opentelemetry
metrics:
endpoint: abc
namespace: ...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we can just keep the one endpoint property and keep it simple for now. maybe nesting in a follow up thing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is interesting. We have a decision to make here. Do we want a single sink to support all types at the same time or an individual sink for logs/metrics/traces?
I would argue the latter is easier to support.
Looking at this whole PR, I think what we really want to leverage is the automatic internal mapping (encoding). As for the rest of the boilerplate code here, I am hoping we can probably get away with deleting it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In case it helps here is my setup;
OTEL collector:
receivers:
otlp:
protocols:
http:
endpoint: "0.0.0.0:5318" # from python generator
exporters:
debug:
otlp:
endpoint: localhost:4317
tls:
insecure: true
processors:
batch: {} # Batch processor to optimize log export
service:
pipelines:
logs:
receivers: [otlp]
processors: [batch]
exporters: [debug]
metrics:
receivers: [otlp]
processors: [batch]
exporters: [debug]Generate fake logs:
#!/usr/bin/env python3
import time
import requests
import json
import uuid
def generate_log():
# Specify the OTEL collector endpoint for HTTP
otel_endpoint = "http://localhost:5318/v1/logs"
unique_id = 0
while True:
# Generate a unique ID for the log message
unique_id += 1
# Create a log message with the unique ID
log_entry = {
"resourceLogs": [
{
"resource": {
"attributes": [
{"key": "service.name", "value": {"stringValue": "python-log-generator"}}
]
},
"scopeLogs": [
{
"scope": {"name": "example-log"},
"logRecords": [
{
"timeUnixNano": int(time.time() * 1e9),
"severityText": "INFO",
"body": {"stringValue": f"This is a generated log message with ID: {unique_id}"},
"attributes": [
{"key": "unique_id", "value": {"intValue": unique_id}}
]
}
]
}
]
}
]
}
# Send the log entry to the OTEL receiver
try:
response = requests.post(
otel_endpoint,
data=json.dumps(log_entry),
headers={"Content-Type": "application/json"}
)
if response.status_code == 200:
print(f"Success - Log ID: {unique_id}")
else:
print(f"Failed to send log: {response.status_code} {response.text}")
except Exception as e:
print(f"Error sending log: {e}")
# Wait for 4 seconds before sending the next log
time.sleep(4)
if __name__ == "__main__":
generate_log()The script above can be adjusted to produce fake metrics.
While testing this PR, the goal is slowly improve the encoder to send payloads downstream that can be ingested.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
https://github.com/vectordotdev/vector/pull/22550/files#r2070655913
Forgot to add here, a sink per telemetry data type doesn't fully solve our issue here. If metrics need additional config, then we probably need to group those settings. I hope this is straightforward but we can know for sure as development progresses.
Thank you for your feedback!! Please keep it comin with any ideas ya got |
pront
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I left some comments which I believe should help with the main open question here i.e. how can a sink support multiple telemetry data types at the same time.
| Ok((sink, healthcheck)) | ||
| } | ||
|
|
||
| fn input(&self) -> Input { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There are several sinks that accept more than one telemetry data type, examples:
- https://github.com/vectordotdev/vector/blob/master/src/sinks/appsignal/config.rs#L140
- (codec specific)
vector/src/sinks/console/config.rs
Line 99 in b70e555
Input::new(self.encoding.config().1.input_type())
You can draw inspiration from how those sinks handle different types before sending them downstream.
| }; | ||
|
|
||
| // Create the final sink | ||
| let sink = if let Some(metrics_sink) = metrics_sink { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After my response here, I realize that this is the best example since the current implementation is using this code directly:
- The build implementation: https://github.com/vectordotdev/vector/blob/master/src/sinks/http/config.rs#L240-L336
- Note how the encoder accepts the generic
Eventtype: https://github.com/vectordotdev/vector/blob/master/src/sinks/http/encoder.rs#L49
1720078 to
ffe54be
Compare
Summary
This PR adds OpenTelemetry metrics support to the OpenTelemetry sink, enabling direct export of Vector metrics to OpenTelemetry collectors.
Change Type
Is this a breaking change?
How did you test this PR?
Created unit tests the OpenTelemetry metrics encoding and configuration and executed
cargo vdev testsuccessfully.Does this PR include user facing changes?
Checklist
make check-allis a good command to run locally. This check isdefined here. Some of these
checks might not be relevant to your PR. For Rust changes, at the very least you should run:
cargo fmt --allcargo clippy --workspace --all-targets -- -D warningscargo nextest run --workspace(alternatively, you can runcargo test --all)Cargo.lock), pleaserun
dd-rust-license-tool writeto regenerate the license inventory and commit the changes (if any). More details here.References