Skip to content

feat(health): SSE streaming log support. OtlpSink. FileSync#711

Merged
yoks merged 7 commits intoNVIDIA:mainfrom
mkoci:feat_health_streaming_sse_logs
Apr 17, 2026
Merged

feat(health): SSE streaming log support. OtlpSink. FileSync#711
yoks merged 7 commits intoNVIDIA:mainfrom
mkoci:feat_health_streaming_sse_logs

Conversation

@mkoci
Copy link
Copy Markdown
Contributor

@mkoci mkoci commented Mar 24, 2026

Description

Adds support for streaming Redfish LogEntries through SSE (Server-Sent Events). Introduces an OTLP log export pipeline that batches and ships collected log events to an OpenTelemetry collector over gRPC.

Breaking Changes

  • This PR contains breaking changes

Type of Change

  • Add - New feature or capability

Testing

  • Unit tests added/updated
  • Integration tests added/updated
  • Manual testing performed
  • No testing required (docs, internal refactor, etc.)

Additional Notes

  • We are now vendoring the OTLP proto types via tonic-prost-build rather than pinning tonic to avoid the opentelemetry-proto conflict.

@copy-pr-bot
Copy link
Copy Markdown

copy-pr-bot Bot commented Mar 24, 2026

Auto-sync is disabled for draft pull requests in this repository. Workflows must be run manually.

Contributors can view more details about this message here.

@mkoci mkoci changed the title Feat health streaming sse logs feat(health): Experimental streaming log support via SSE (and it's messy implications) Mar 24, 2026
Copy link
Copy Markdown
Contributor

@Matthias247 Matthias247 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have not looked at everything so far.

But one high level question upfront: Is there a specific reason that SSE connections need to make the internal event handling asynchronous? My understanding is it could still be synchronous (the events are written to channels synchronously and can then be processed on the other side at arbitrary speed).

Is the reasoning here backpressure? If yes, it could likely be handled other ways too. E.g. we could implement a channel where we just drop the newest (or oldest) events if the processing infrastructure can't keep up. But @yoks is certainly the expert on how its designed so far and can provide more input.

Comment thread crates/health/src/collectors/runtime.rs
Comment thread crates/health/src/collectors/runtime.rs
Comment thread crates/health/src/collectors/runtime.rs Outdated
Comment thread crates/health/src/discovery/spawn.rs Outdated
@yoks
Copy link
Copy Markdown
Contributor

yoks commented Mar 31, 2026

I think OTLP should be modelled as a sink, as it async and can have backpressure i think you can reuse OverrideQueue for that. Very similar on how HealthOverrides works

@yoks
Copy link
Copy Markdown
Contributor

yoks commented Mar 31, 2026

I have not looked at everything so far.

But one high level question upfront: Is there a specific reason that SSE connections need to make the internal event handling asynchronous? My understanding is it could still be synchronous (the events are written to channels synchronously and can then be processed on the other side at arbitrary speed).

Is the reasoning here backpressure? If yes, it could likely be handled other ways too. E.g. we could implement a channel where we just drop the newest (or oldest) events if the processing infrastructure can't keep up. But @yoks is certainly the expert on how its designed so far and can provide more input.

Yes, they should be synchronios as long as they are not hitting slow sink (e.g. API/OTEL). I think File sink should be fine as well. Answered above, but i design similar on how HealthOverride handled looks like the way how they can be handled.

@mkoci mkoci marked this pull request as ready for review April 2, 2026 02:54
@mkoci mkoci requested a review from a team as a code owner April 2, 2026 02:54
Copilot AI review requested due to automatic review settings April 2, 2026 02:54
@github-actions
Copy link
Copy Markdown

github-actions Bot commented Apr 2, 2026

🔐 TruffleHog Secret Scan

No secrets or credentials found!

Your code has been scanned for 700+ types of secrets and credentials. All clear! 🎉

🔗 View scan details

🕐 Last updated: 2026-04-02 02:55:54 UTC | Commit: 616afc7

Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Adds experimental Redfish LogEntries streaming via SSE to reduce bursty periodic log scrapes, and introduces an async OTLP (gRPC) export path with bounded-channel backpressure to protect the health service under load.

Changes:

  • Introduces an EventPipeline that preserves the existing synchronous sink behavior while optionally forwarding OTLP-relevant events through a bounded async channel for backpressure.
  • Adds a streaming-collector runtime abstraction + SSE log collector, with logs collection now configurable as sse (default) or periodic.
  • Implements OTLP log export (proto generation, event→OTLP conversion, drain task with batching/flush/retry).

Reviewed changes

Copilot reviewed 24 out of 26 changed files in this pull request and generated 5 comments.

Show a summary per file
File Description
Makefile.toml Formatting adjustments for env vars and rustfmt tasks.
crates/ssh-console/Cargo.toml Reformat tokio features list.
crates/health/src/processor/mod.rs Adds handle_and_collect to capture original + derived events for downstream forwarding.
crates/health/src/pipeline.rs New async EventPipeline that forwards OTLP-relevant events via bounded channel.
crates/health/src/otlp/mod.rs New OTLP module with generated proto includes + re-exports.
crates/health/src/otlp/drain.rs New OTLP drain task: connect, batch, flush, retry w/ backoff.
crates/health/src/otlp/convert.rs Converts internal events/logs into OTLP ExportLogsServiceRequest.
crates/health/src/lib.rs Wires EventPipeline, OTLP drain lifecycle, and SSE-related error type.
crates/health/src/discovery/spawn.rs Spawns logs collectors based on mode (SSE vs periodic) and injects pipeline.
crates/health/src/discovery/iteration.rs Threads EventPipeline through discovery iteration/spawn.
crates/health/src/config.rs Adds OTLP sink config + logs mode config/validation and updates tests.
crates/health/src/collectors/sensors.rs Switches sensor collector emission to async pipeline.
crates/health/src/collectors/runtime.rs Adds streaming collector trait/runtime, SSE open helper, and backoff utilities.
crates/health/src/collectors/nvue/rest/collector.rs Switches NVUE collector emission to async pipeline.
crates/health/src/collectors/nmxt.rs Switches NMX-T collector emission to async pipeline.
crates/health/src/collectors/mod.rs Re-exports streaming runtime types and new SSE log collector.
crates/health/src/collectors/logs/sse.rs New SSE log collector mapping Redfish EventService payloads into log events.
crates/health/src/collectors/logs/periodic.rs Periodic logs updated to use pipeline + optional file writer + rotation tweak.
crates/health/src/collectors/logs/mod.rs New logs module split (periodic + sse).
crates/health/src/collectors/firmware.rs Switches firmware collector emission to async pipeline.
crates/health/example/config.example.toml Updates example to document collectors.logs.mode and periodic sub-table.
crates/health/Cargo.toml Adds prost/tonic-prost deps and build dep for proto compilation.
crates/health/build.rs New build script fetching/compiling OTLP protos for generated gRPC client.
crates/health/benches/processor_pipeline.rs Adds bench for handle_and_collect overhead.
crates/bmc-explorer/Cargo.toml Reformat nv-redfish feature list.
Cargo.lock Locks new prost/tonic-prost deps (and related lockfile churn).
Comments suppressed due to low confidence (1)

crates/health/src/collectors/logs/periodic.rs:612

  • last_seen_ids is updated even when writing the log batch to disk fails. If the log file is a required output, this can permanently skip entries on the next iteration (data loss on disk). Consider only advancing last_seen_ids (and total_log_count) after a successful write, or otherwise making the failure semantics explicit (e.g., retry on next run when the writer errors).

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment thread crates/health/src/lib.rs Outdated
Comment thread crates/health/src/collectors/runtime.rs
Comment thread crates/health/build.rs Outdated
Comment thread crates/health/src/otlp/drain.rs Outdated
Comment thread crates/health/src/config.rs
@mkoci
Copy link
Copy Markdown
Contributor Author

mkoci commented Apr 4, 2026

I think OTLP should be modelled as a sink, as it async and can have backpressure i think you can reuse OverrideQueue for that. Very similar on how HealthOverrides works

From the PR description I added after this comment:

What about something like the OverrideQueue?
The OverrideQueue uses a unique key to loosely bound Overrides. This works because there's a finite number of MachineId's and ReportSource's and we only care about the latest. For logs / events this leads to dropping, not to mention the queue size for logs / events has much more freedom compared to the bounded machine/report uniqueness dimension.

If we don't care about dropping, this is much easier to implement similar to the OverrideQueue. Is your expectation that we only hold the latest events in the queue per machineid to push to the OTLPSink and drop the rest?

Copy link
Copy Markdown
Contributor

@kensimon kensimon left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Haven't looked at the code yet but this really stuck out to me, I think we ought to vendor these files rather than shelling out to curl on every build.

Comment thread crates/health/build.rs Outdated
Copy link
Copy Markdown
Contributor

@kensimon kensimon left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These are all nitpicks and can be ignored, only the curl-in-build-rs issue is a "request changes" from me.

Comment thread crates/health/src/otlp/drain.rs Outdated
Comment thread crates/health/src/otlp/drain.rs Outdated
Comment thread crates/health/src/otlp/drain.rs
Comment thread crates/health/src/otlp/drain.rs Outdated
Comment thread crates/health/src/otlp/drain.rs
Comment thread crates/health/src/collectors/runtime.rs Outdated
Comment thread crates/health/src/collectors/runtime.rs Outdated
Comment thread crates/health/src/collectors/runtime.rs Outdated
Comment thread crates/health/src/collectors/runtime.rs Outdated
Comment thread crates/health/src/collectors/runtime.rs Outdated
mkoci added 3 commits April 16, 2026 04:59
Add criterion benchmarks for OtlpSink enqueue throughput (low/high
contention) and queue key construction latency. Remove the stale
handle_and_collect benchmark that referenced a deleted method.

Signed-off-by: mkoci <mkoci@nvidia.com>
@mkoci mkoci force-pushed the feat_health_streaming_sse_logs branch from 3ed7f31 to 8ab95ce Compare April 16, 2026 03:26
@mkoci mkoci changed the title feat(health): Experimental streaming log support via SSE (and it's messy implications) feat(health): SSE streaming log support. OtlpSink. FileSync Apr 16, 2026
- convert StreamingCollector::connect to #[async_trait]; drop the
  ConnectFuture Pin<Box> type alias. No blocker surfaced on
  re-evaluation -- async-trait is already a workspace dep and the
  trait is only used generically, never as dyn.
- replace the four tokio::select! blocks in
  Collector::start_streaming with CancellationToken::run_until_cancelled
  per kensimon's suggestions.
- consolidate batch.clear() to a single call site in
  OtlpDrainTask::flush(), right after build_export_request.
- prefer HealthReportProcessor::default() in benches (the struct
  derives Default).

Signed-off-by: mkoci <mkoci@nvidia.com>
mkoci added a commit to mkoci/infra-controller-core that referenced this pull request Apr 17, 2026
Replace the per-endpoint connection_state Gauge<f64> (which encoded
browser EventSource readyState as 0/1/2) with a single service-level
active_sse_connections IntGauge on MetricsManager. This answers the
actual operational question -- "how many SSE streams are live?" -- as
a gauge read rather than a `count(connection_state == 1)` aggregation
across thousands of endpoints.

Reconnect-loop detection moves to the existing per-endpoint counters:

    # live streams
    carbide_hardware_health_active_sse_connections

    # endpoints reconnecting but processing zero events
    rate(carbide_hardware_health_stream_reconnections_total[5m]) > 0
      and rate(carbide_hardware_health_stream_items_processed_total[5m]) == 0

Details:
- remove STREAM_STATE_{CONNECTING,OPEN,CLOSED} constants and the
  connection_state Gauge from StreamMetrics + its registration.
- add active_sse_connections IntGauge to MetricsManager with an
  active_sse_connections() accessor.
- add SseConnectionGuard (RAII): inc on construction, dec on Drop.
  Construct inside the Ok(mut stream) arm of start_streaming so the
  guard's lifetime is precisely the connected phase; drop covers every
  exit path (cancel, stream error, stream end, task panic) and fires
  before the reconnect backoff sleep.
- introduce StreamingCollectorStartContext, mirroring the existing
  CollectorStartContext pattern, to keep start_streaming within the
  clippy::too_many_arguments limit while threading the new gauge.

Addresses:
- Matthias247 NVIDIA#711 (r3017733441) -- oddly specific connection-state metric
- kensimon NVIDIA#711 (r3040326749)   -- IntGauge for discrete state (field deleted entirely)

Signed-off-by: mkoci <mkoci@nvidia.com>
@mkoci
Copy link
Copy Markdown
Contributor Author

mkoci commented Apr 17, 2026

I have not looked at everything so far.

But one high level question upfront: Is there a specific reason that SSE connections need to make the internal event handling asynchronous? My understanding is it could still be synchronous (the events are written to channels synchronously and can then be processed on the other side at arbitrary speed).

Is the reasoning here backpressure? If yes, it could likely be handled other ways too. E.g. we could implement a channel where we just drop the newest (or oldest) events if the processing infrastructure can't keep up. But @yoks is certainly the expert on how its designed so far and can provide more input.

I don't think there's a great reason anymore. Just because we can, doesn't mean we should.

"if you need to backpressure Redfish, you're already broken" - Ed Tanous

Replace the per-endpoint connection_state Gauge<f64> (which encoded
browser EventSource readyState as 0/1/2) with a per-endpoint
stream_connected IntGauge that is 1 while the stream is live and 0
otherwise. The gauge is colocated with the existing per-stream counters
on StreamMetrics and carries the same {collector_type, endpoint_key}
labels, so all four SSE metrics are registered and torn down together
with the owning CollectorRegistry -- no new leak surface relative to
what already exists (tracked in NVIDIA#989).

Per endpoint remains consistent with yoks' guidance that "collectors
create per-endpoint, so this should be fine" and lets rack validation
filter on a specific endpoint_key to see connection state over time.
Service-wide count is derivable as `sum(stream_connected)`.

Details:
- remove STREAM_STATE_{CONNECTING,OPEN,CLOSED} constants.
- swap the connection_state Gauge<f64> field on StreamMetrics for an
  IntGauge `connected` with the same const labels and same registry
  lifecycle.
- add SseConnectionGuard (RAII): inc on construction, dec on Drop.
  Construct inside the Ok(mut stream) arm of start_streaming so the
  guard's lifetime is precisely the connected phase; Drop covers every
  exit path (cancel, stream error, stream end, task panic) and fires
  before the reconnect backoff sleep, so an endpoint in a reconnect
  loop correctly reports 0 during backoff.
- introduce StreamingCollectorStartContext, mirroring the existing
  CollectorStartContext pattern, to keep start_streaming readable.

Grafana:

    # service-wide active count
    sum(carbide_hardware_health_stream_connected)

    # per-endpoint liveness
    carbide_hardware_health_stream_connected{endpoint_key="..."}

    # endpoints reconnecting but processing zero events
    rate(carbide_hardware_health_stream_reconnections_total[5m]) > 0
      and rate(carbide_hardware_health_stream_items_processed_total[5m]) == 0

Addresses:
- Matthias247 NVIDIA#711 (r3017733441) -- readyState integer encoding dropped
- kensimon NVIDIA#711 (r3040326749)   -- IntGauge (0/1) instead of Gauge<f64>

Signed-off-by: mkoci <mkoci@nvidia.com>
@mkoci mkoci force-pushed the feat_health_streaming_sse_logs branch from ea44a25 to b161caf Compare April 17, 2026 14:02
@mkoci
Copy link
Copy Markdown
Contributor Author

mkoci commented Apr 17, 2026

Finally found time to revisit this one!

Re-requesting reviews from @Matthias247 @yoks @kensimon, perhaps @chet might take a look as well?

TLDR;

  • I rebased commits to include the removal of the fever dream of an async pipeline and back pressure on machines for streaming
  • Refactored the OtlpSink to use the OverrideQueue (based on the HealthOverrideSink) bounded by (Machine)x(concerns per Machine)
  • LogFileSink implemented (@Matthias247 @yoks)
  • StreamingCollector::connect now uses an async trait (@yoks is this okay?)
  • SSE connection observability redesign (addresses many comment threads)

Many nits were addressed, one remains. Need feedback from @kensimon here

Issues Opened

@mkoci mkoci requested a review from kensimon April 17, 2026 16:24
@mkoci mkoci requested review from Matthias247 and yoks April 17, 2026 16:25
Copy link
Copy Markdown
Contributor

@kensimon kensimon left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, thanks for making those changes in build.rs.

Comment thread crates/health/src/otlp/drain.rs
@Matthias247
Copy link
Copy Markdown
Contributor

Thanks for the updates. I don't think I'll have time to look at it in detail today, but if previous things had been addressed i'm fine if @yoks and @kensimon are.

@joseph-shifflett
Copy link
Copy Markdown

can this be merged now? @Matthias247 @yoks @kensimon

@yoks
Copy link
Copy Markdown
Contributor

yoks commented Apr 17, 2026

One thing for future - look at the proto/grpc. Most likely health would need to provide streaming grpc server for health data, so we will need to unify it. But it is not part of this PR, it big as it is.

@yoks
Copy link
Copy Markdown
Contributor

yoks commented Apr 17, 2026

I can merge as soon as Matt says it is ready

@yoks yoks merged commit d18490b into NVIDIA:main Apr 17, 2026
44 checks passed
@mkoci
Copy link
Copy Markdown
Contributor Author

mkoci commented Apr 18, 2026

One thing for future - look at the proto/grpc. Most likely health would need to provide streaming grpc server for health data, so we will need to unify it. But it is not part of this PR, it big as it is.

Do you mean health will be a standalone gRPC server for outputting telemetry?

I have another stacked code change. After I refactor it, it will provide NVOS gNMI streaming support for Switch hosts - health as a gRPC client collector.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants