diff --git a/docs/execplans/11-1-2-structured-logging-and-tracing-spans.md b/docs/execplans/11-1-2-structured-logging-and-tracing-spans.md new file mode 100644 index 00000000..fe644e8d --- /dev/null +++ b/docs/execplans/11-1-2-structured-logging-and-tracing-spans.md @@ -0,0 +1,751 @@ +# 11.1.2 Structured logging and tracing spans + +This ExecPlan (execution plan) is a living document. The sections +`Constraints`, `Tolerances`, `Risks`, `Progress`, `Surprises & Discoveries`, +`Decision Log`, and `Outcomes & Retrospective` must be kept up to date as work +proceeds. + +Status: COMPLETE + +## Purpose / big picture + +Roadmap item 11.1.2 requires structured logging and tracing spans around client +connect, call, send, receive, close, and stream lifecycle events, plus +configuration for per-command timing. After this change, every client operation +emits a `tracing` span with structured fields (frame size, correlation ID, +operation result, peer address, stream frame count). Users optionally enable +per-command elapsed-time events via a `TracingConfig` builder method. When no +`tracing` subscriber is +installed, all instrumentation is zero-cost. + +Observable success: a user configures `TracingConfig` on the builder, performs +connect/send/receive/call/streaming/close, and each operation produces a +tracing span at the configured level with structured fields. Timing events +appear when enabled. All existing tests pass unchanged. New unit tests using +rstest with `tracing-test`, and behaviour-driven development (BDD) tests using +rstest-bdd v0.5.0, validate the instrumentation. + +## Constraints + +Hard invariants. Violation requires escalation, not workarounds. + +- No single source file may exceed 400 lines. +- Public APIs must be documented with rustdoc (`///`) including examples. +- Every module must begin with a `//!` module-level comment. +- All code must pass `make check-fmt`, `make lint` (clippy `-D warnings`), + and `make test`. +- Comments and documentation must use en-GB-oxendict spelling. +- Tests must use `rstest` fixtures; BDD tests use `rstest-bdd` v0.5.0. +- Existing public API signatures must not change (backwards compatibility). +- Clients configured without tracing must behave identically to today. +- The `builder_field_update!` macro in `src/client/builder/mod.rs` must be + updated for every arm when a new field is added to the builder struct. +- No new external crate dependencies (`tracing`, `tracing-subscriber`, and + `tracing-test` are already in `Cargo.toml`). +- Design decisions must be recorded in `docs/wireframe-client-design.md`. +- `docs/users-guide.md` must be updated with the new public API surface. +- `docs/roadmap.md` item 11.1.2 must be marked done only after all gates pass. + +## Tolerances (exception triggers) + +- Scope: if implementation requires changes to more than 30 files (net new + + modified), stop and escalate. +- Interface: if any existing public API signature must change (not just adding + new methods/types), stop and escalate. +- Dependencies: if a new external crate is required, stop and escalate. +- Iterations: if tests still fail after 3 attempts at a given milestone, stop + and escalate. +- Time: if any single stage exceeds 4 hours elapsed, stop and escalate. +- Line budget: if any source file is projected to exceed 390 lines, extract + code to a submodule before continuing. + +## Risks + +- Risk: `Span::enter()` guard held across `.await` in async methods. + Severity: medium. Likelihood: low. Mitigation: client methods use + `tracing::Instrument::instrument(span)` to wrap async futures rather than + holding a `Span::enter()` guard across `.await` points. This ensures the span + is entered only while the future is polled and exited between polls, which is + safe under multithreaded runtimes. + +- Risk: `runtime.rs` exceeds 400 lines after adding instrumentation. + Severity: high. Likelihood: low. Mitigation: current is 354 lines, estimated + addition is ~26 lines = ~380. If it approaches 400, extract `close()` + instrumentation to a dedicated `src/client/close.rs` file. + +- Risk: `dynamic_span!` macro causes binary bloat from 5 instantiations per + call site. Severity: low. Likelihood: medium. Mitigation: each branch is a + single `tracing::*_span!` call. Link-time optimization (LTO) eliminates + unused branches. Total call sites: 8 — negligible impact. + +- Risk: BDD `tracing_subscriber` setup conflicts with other parallel tests. + Severity: medium. Likelihood: medium. Mitigation: use + `tracing::subscriber::set_default()` (thread-local, not global). The + `DefaultGuard` is stored in the world fixture and dropped when the world is + dropped, restoring the previous subscriber. Each test has its own subscriber. + +- Risk: BDD fixture parameter naming mismatch. + Severity: high. Likelihood: medium. Mitigation: known gotcha — `rstest-bdd` + resolves step function parameters by name, not type. All step functions must + use `client_tracing_world` (matching the fixture function name exactly). + +## Progress + +- [x] Stage A: scaffolding — `TracingConfig` type, helpers, builder plumbing +- [x] Stage B: runtime integration — instrument all client methods with spans +- [x] Stage C: unit tests (rstest + `tracing-test`) +- [x] Stage D: BDD tests (rstest-bdd v0.5.0) +- [x] Stage E: documentation — users-guide, client-design, roadmap + +## Surprises & discoveries + +- `tracing-test`'s `FmtSubscriber` only outputs log lines when events are + emitted within spans. Span names appear as context prefixed to event lines. + Unit tests must enable per-command timing (or rely on other events) to + produce observable output within spans for assertion. + +- `clippy::cognitive_complexity` fires on every function using the + `dynamic_span!` macro because the 5-branch match expands to complexity 17. + Each span factory function requires `#[expect(clippy::cognitive_complexity)]`. + +- `ResponseStream::poll_next` gained enough complexity from the added + tracing events to trigger `clippy::cognitive_complexity` (10/9). Added an + `#[expect]` attribute. + +- `tracing::Level` is `Copy`, not just `Clone`. The plan initially used + `.clone()` in `with_all_levels()`, but clippy flagged `clone_on_copy`. + +- Timing events should fire on error paths too, not just success. The initial + implementation only emitted timing on the success path in + `receive_internal()`, causing the error-path tracing test to fail. Fixed by + adding `emit_timing_event` calls to all error branches. + +## Decision log + +- Decision: use `tracing` spans with a `dynamic_span!` macro for dynamic level + selection, not compile-time-only levels. Rationale: the `tracing` crate + requires compile-time level constants in `span!` macros. To support + user-configurable levels per operation, a `macro_rules!` macro matches on the + five `Level` variants, delegating to the corresponding + `tracing::_span!` macro per branch. Each branch has static metadata + while branch selection is dynamic. + +- Decision: timing events always emit at `DEBUG` level, regardless of span + level. Rationale: timing is opt-in diagnostic data. Emitting at the span's + configured level could produce unexpected `INFO`-level timing events in + production when a user only intended `INFO` for connection lifecycle. `DEBUG` + is the correct diagnostic level. + +- Decision: `ResponseStream` emits per-frame events (not spans) at `DEBUG` + level. Rationale: creating spans inside `poll_next` is problematic because it + is synchronous and called many times. Events are lightweight and appropriate + for per-frame diagnostics. + +- Decision: add a `frame_count` field and `frame_count()` accessor to + `ResponseStream`. Rationale: enables structured `stream.frames_received` and + `stream.frames_total` fields in tracing events. Also, useful as a public API + for users who want to track stream progress without tracing. + +- Decision: place span factory functions in a dedicated `tracing_helpers.rs` + module, not inline in each method. Rationale: centralizes span creation, + keeps instrumentation logic out of the hot-path methods, and enables + consistent span naming and field conventions. + +## Outcomes & retrospective + +All five stages completed. Final validation passed all gates: + +- `make check-fmt`: exit 0 +- `make lint` (clippy `-D warnings` + rustdoc `-D warnings`): exit 0 +- `make markdownlint`: 0 errors +- `make test`: 0 failures (387 unit tests + 146 integration tests + + 6 BDD scenarios) + +Files created: 8. Files modified: 15. Total: 23 (within the 30-file tolerance). +All source files remain under 400 lines. + +**What worked well:** + +- The `dynamic_span!` macro strategy cleanly solved the compile-time level + requirement while keeping per-call-site metadata static. +- Centralizing span factories in `tracing_helpers.rs` kept the runtime + methods clean and consistent. +- The `builder_field_update!` macro pattern made builder plumbing + straightforward once all three arms were identified. + +**What required adaptation:** + +- `tracing-test` captures formatted event output, not raw span creation. + Tests had to enable per-command timing for the operation under test so that + timing events emitted within spans made span names visible in captured output. +- Timing events needed to fire on error paths too. The initial + implementation only emitted `elapsed_us` on the success path in + `receive_internal()`. Three error branches were missing `emit_timing_event()` + calls. +- The `poll_next` method in `response_stream.rs` exceeded clippy's + cognitive complexity threshold (10 vs limit 9) after adding per-frame tracing + events. An `#[expect]` annotation was added with a reason. + +**What would be done differently:** + +- Begin with a spike test for `tracing-test` capture semantics before + writing all 15 unit tests. Understanding the event-not-span capture model + earlier would have avoided a full rewrite of the test assertions. + +## Context and orientation + +The wireframe crate (run from the repository root) is a Rust async networking framework +using Tokio. The client subsystem lives in `src/client/`: + +```plaintext +src/client/ + mod.rs — module root, public re-exports (82 lines) + runtime.rs — WireframeClient struct, send/receive/call/close (354 lines) + messaging.rs — send_envelope, receive_envelope, call_correlated (284 lines) + streaming.rs — call_streaming, receive_streaming (150 lines) + response_stream.rs — ResponseStream impl Stream, poll_next (168 lines) + hooks.rs — LifecycleHooks, RequestHooks, type aliases (180 lines) + error.rs — ClientError enum (130 lines) + tracing_config.rs — NEW: TracingConfig struct + tracing_helpers.rs — NEW: span factories and timing helpers + builder/ + mod.rs — builder_field_update! macro (65 lines) + core.rs — WireframeClientBuilder struct (63 lines) + connect.rs — connect() method (101 lines) + lifecycle.rs — on_connection_setup/teardown/error (131 lines) + request_hooks.rs — before_send/after_receive builder methods (79 lines) + tracing.rs — NEW: tracing_config() builder method + codec.rs — codec config builder methods (58 lines) + serializer.rs — serializer() builder method + preamble.rs — preamble builder methods + tests/ + mod.rs — test module root (175 lines) + helpers.rs — test helpers (169 lines) + tracing.rs — NEW: tracing unit tests +``` + +BDD tests follow this four-file structure: + +```plaintext +tests/ + features/ — Gherkin .feature files + fixtures/ — World structs with #[fixture] fns and step methods + steps/ — #[given], #[when], #[then] step definitions + scenarios/ — #[scenario] bindings linking features to fixtures +``` + +Key dependencies already available: `tracing` (0.1.41 with `log-always` +feature), `tracing-subscriber` (0.3.18), `tracing-test` (0.2.5 dev-dep). + +Data flow for send: +`message → serializer.serialize() → Vec → invoke_before_send_hooks() → framed.send()` + +Data flow for receive: +`framed.next() → BytesMut → invoke_after_receive_hooks()` +`→ serializer.deserialize() → message` + +## Plan of work + +### Stage A: scaffolding — types, helpers, and builder plumbing + +**A1. Create `src/client/tracing_config.rs`** (~115 lines) + +Define `TracingConfig` with per-operation level and timing fields: + +```rust +#[derive(Clone, Debug)] +pub struct TracingConfig { + pub(crate) connect_level: Level, + pub(crate) send_level: Level, + pub(crate) receive_level: Level, + pub(crate) call_level: Level, + pub(crate) streaming_level: Level, + pub(crate) close_level: Level, + pub(crate) connect_timing: bool, + pub(crate) send_timing: bool, + pub(crate) receive_timing: bool, + pub(crate) call_timing: bool, + pub(crate) streaming_timing: bool, + pub(crate) close_timing: bool, +} +``` + +Default: `INFO` for connect/close (lifecycle), `DEBUG` for send/receive/call/ +streaming (data operations), timing disabled for all. + +Provide 14 `#[must_use] pub fn with_*` setter methods (one level + one timing +per operation) and two convenience methods: `with_all_levels(Level)` and +`with_all_timing(bool)`. All documented with rustdoc examples. + +**A2. Create `src/client/tracing_helpers.rs`** (~130 lines) + +A private `dynamic_span!` macro that matches on the five `tracing::Level` +variants, delegating to the corresponding `tracing::_span!` macro: + +```rust +macro_rules! dynamic_span { + ($level:expr, $name:expr $(, $($field:tt)*)?) => { + match $level { + Level::ERROR => tracing::error_span!($name $(, $($field)*)?), + Level::WARN => tracing::warn_span!($name $(, $($field)*)?), + Level::INFO => tracing::info_span!($name $(, $($field)*)?), + Level::DEBUG => tracing::debug_span!($name $(, $($field)*)?), + Level::TRACE => tracing::trace_span!($name $(, $($field)*)?), + } + }; +} +``` + +Eight `pub(crate)` span factory functions: + +| Function | Span name | Structured fields | +| ---------------------- | ------------------------ | -------------------------------------- | +| `connect_span` | `client.connect` | `peer.addr` | +| `send_span` | `client.send` | `frame.bytes` | +| `receive_span` | `client.receive` | `frame.bytes=Empty`, `result=Empty` | +| `send_envelope_span` | `client.send_envelope` | `correlation_id`, `frame.bytes` | +| `call_span` | `client.call` | `result=Empty` | +| `call_correlated_span` | `client.call_correlated` | `correlation_id=Empty`, `result=Empty` | +| `streaming_span` | `client.call_streaming` | `correlation_id`, `frame.bytes=Empty` | +| `close_span` | `client.close` | (none) | + +One `pub(crate) fn emit_timing_event(start: Option)` that emits a +`tracing::debug!` event with `elapsed_us` when `start` is `Some`. + +**A3. Create `src/client/builder/tracing.rs`** (~55 lines) + +One builder method `tracing_config(self, config: TracingConfig) -> Self` with +rustdoc and example. + +**A4. Modify `src/client/builder/core.rs`** (63 → ~67 lines) + +Add `pub(crate) tracing_config: TracingConfig` field to +`WireframeClientBuilder`. Initialize as `TracingConfig::default()` in `new()`. + +**A5. Modify `src/client/builder/mod.rs`** (65 → ~75 lines) + +Add `tracing_config: $self.tracing_config,` to all three arms of the +`builder_field_update!` macro (serializer, preamble_config, lifecycle_hooks). +Add `mod tracing;` to the submodule list. + +**A6. Modify `src/client/mod.rs`** (82 → ~86 lines) + +Add `mod tracing_config;` and `mod tracing_helpers;`. Add +`pub use tracing_config::TracingConfig;` to the public re-exports. + +**A7. Validate stage A.** + +```bash +set -o pipefail +make check-fmt 2>&1 | tee /tmp/wireframe-check-fmt.log; echo "check-fmt exit: $?" +make lint 2>&1 | tee /tmp/wireframe-lint.log; echo "lint exit: $?" +make test 2>&1 | tee /tmp/wireframe-test.log; echo "test exit: $?" +``` + +Expected: all three exit code 0. Existing tests pass unchanged. + +### Stage B: runtime integration — instrument all client methods + +**B1. Modify `src/client/builder/connect.rs`** (101 → ~120 lines) + +At the start of `connect()`, create a connect span and optional timing start. +Thread `tracing_config` through the `WireframeClient` constructor. Use +`tracing::Instrument::instrument(span)` to wrap the async connect future rather +than holding a `Span::enter()` guard across `.await`. + +```rust +let span = connect_span(&self.tracing_config, &addr.to_string()); +let start = self.tracing_config.connect_timing.then(Instant::now); +let result = self.connect_inner(addr).instrument(span).await; +emit_timing_event(start); +result +``` + +**B2. Modify `src/client/runtime.rs`** (354 → ~380 lines) + +Add `pub(crate) tracing_config: TracingConfig` field to `WireframeClient`. + +Instrument `send()`: after serialization produces `bytes`, create `send_span` +with `frame.bytes`, emit timing on success. + +Instrument `call()`: create `call_span` wrapping the send+receive pair, record +`result` on completion. + +Instrument `close()`: create `close_span`, emit timing. + +**B3. Modify `src/client/messaging.rs`** (284 → ~330 lines) + +Instrument `send_envelope()`: after correlation ID is resolved and +serialization succeeds, create `send_envelope_span` with `correlation_id` and +`frame.bytes`. + +Instrument `receive_internal()`: create `receive_span`, record `frame.bytes` +after frame arrives, record `result` on success/failure. + +Instrument `call_correlated()`: create `call_correlated_span` with +`correlation_id=Empty`, record the actual ID after `send_envelope` returns, +record `result` on completion. + +**B4. Modify `src/client/streaming.rs`** (150 → ~178 lines) + +Instrument `call_streaming()`: after correlation ID is resolved, create +`streaming_span` with `correlation_id`, record `frame.bytes` after +serialization. + +**B5. Modify `src/client/response_stream.rs`** (168 → ~195 lines) + +Add `frame_count: usize` field to `ResponseStream`, initialized to 0. Add +`pub fn frame_count(&self) -> usize` accessor. + +In `poll_next()`, on `Ok(bytes)`: increment frame count, emit `tracing::debug!` +event with `frame.bytes`, `stream.frames_received`, `correlation_id`. + +In `process_frame()`, when terminator detected: emit `tracing::debug!` event +with `stream.frames_total` and `correlation_id`. + +**B6. Validate stage B.** + +Same validation commands as A7. All existing tests must still pass. + +### Stage C: unit tests (rstest + `tracing-test`) + +**C1. Create `src/client/tests/tracing.rs`** (~280 lines) + +Uses `#[traced_test]` from `tracing-test` combined with `#[rstest]` and +`#[tokio::test]`. The `tracing` crate has `features = ["log", "log-always"]` so +all events bridge to `log` and are captured by `tracing-test`. + +Test cases (15): + +1. `connect_emits_span_with_peer_address` — `logs_assert` finds + `"client.connect"` and the peer address. +2. `send_emits_span_with_frame_bytes` — finds `"client.send"` and + `"frame.bytes"`. +3. `receive_emits_span_with_frame_bytes_and_result` — finds + `"client.receive"`, `"frame.bytes"`, `"result"`. +4. `call_emits_wrapping_span` — finds `"client.call"` and `"result"`. +5. `call_correlated_emits_span_with_correlation_id` — finds + `"client.call_correlated"` and `"correlation_id"`. +6. `send_envelope_emits_span_with_correlation_id_and_frame_bytes` — finds + `"client.send_envelope"`, `"correlation_id"`, `"frame.bytes"`. +7. `call_streaming_emits_span_with_correlation_id` — finds + `"client.call_streaming"` and `"correlation_id"`. +8. `close_emits_span` — finds `"client.close"`. +9. `receive_error_records_result_err` — server drops connection, finds + `"result"` indicating error. +10. `timing_disabled_by_default` — no line contains `"elapsed_us"`. +11. `timing_enabled_emits_elapsed_us_for_send` — with + `with_send_timing(true)`, finds `"elapsed_us"`. +12. `timing_enabled_for_connect` — with `with_connect_timing(true)`, finds + `"elapsed_us"`. +13. `all_timing_convenience_enables_all_operations` — with + `with_all_timing(true)`, multiple `"elapsed_us"` lines. +14. `response_stream_emits_frame_events` — streaming server sends 3 frames + + terminator; finds 3 `"stream frame received"` and 1 + `"stream terminated"` with `"stream.frames_total=3"`. +15. `default_config_is_backwards_compatible` — no builder tracing call, echo + works, no panic. + +**C2. Modify `src/client/tests/mod.rs`** (175 → ~176 lines) + +Add `mod tracing;`. + +**C3. Validate stage C.** + +Same validation commands. All 15 new tests pass alongside existing tests. + +### Stage D: BDD tests (rstest-bdd v0.5.0) + +**D1. Create `tests/features/client_tracing.feature`** (~50 lines) + +Six scenarios covering: connect span with peer address, send span with frame +size, receive span with result, per-command timing emission, timing not emitted +when disabled, close span. + +```gherkin +Feature: Client structured logging and tracing spans + The wireframe client emits tracing spans and structured events around + connect, send, receive, call, streaming, and close operations. + Per-command timing can be enabled via TracingConfig. + + Background: + Given a running echo server for tracing tests + + Scenario: Connect emits a tracing span with the peer address + Given a client with default tracing config + When the client connects to the server + Then the tracing output contains a "client.connect" span + And the tracing span includes the peer address + + Scenario: Send emits a tracing span with frame size + Given a connected tracing client with default config + When the client sends an envelope via the tracing client + Then the tracing output contains a "client.send" span + And the tracing span includes "frame.bytes" + + Scenario: Receive emits a tracing span recording result + Given a connected tracing client with default config + When the client sends and receives via the tracing client + Then the tracing output contains a "client.receive" span + And the tracing span includes "result=ok" + + Scenario: Per-command timing emits elapsed microseconds + Given a connected tracing client with send timing enabled + When the client sends an envelope via the tracing client + Then the tracing output contains "elapsed_us" + + Scenario: Timing is not emitted when disabled + Given a connected tracing client with default config + When the client sends an envelope via the tracing client + Then the tracing output does not contain "elapsed_us" + + Scenario: Close emits a tracing span + Given a connected tracing client with default config + When the tracing client closes the connection + Then the tracing output contains a "client.close" span +``` + +**D2. Create `tests/fixtures/client_tracing.rs`** (~220 lines) + +`ClientTracingWorld` struct with server addr, server handle, client, tracing +config, captured lines (`Arc>>`), and subscriber guard. +Installs a thread-local `tracing` subscriber via +`tracing::subscriber::set_default()` with a `CaptureWriter` that appends +formatted output to the shared vec. + +Methods: `start_echo_server()`, `connect_with_config(TracingConfig)`, +`connect_default()`, `connect_with_send_timing()`, `send_envelope()`, +`send_and_receive()`, `close_connection()`, `assert_output_contains(needle)`, +`assert_output_not_contains(needle)`. + +**D3. Create `tests/steps/client_tracing_steps.rs`** (~110 lines) + +Step definitions using `rstest_bdd_macros::{given, when, then}`. Each step +calls world async methods via `tokio::runtime::Runtime::new()?.block_on()`. All +world parameters named `client_tracing_world` (matching fixture name). + +**D4. Create `tests/scenarios/client_tracing_scenarios.rs`** (~55 lines) + +Six `#[scenario]` functions binding feature scenarios to the fixture. + +**D5. Register BDD modules.** + +- `tests/fixtures/mod.rs`: add `pub mod client_tracing;` +- `tests/steps/mod.rs`: add `mod client_tracing_steps;` +- `tests/scenarios/mod.rs`: add `mod client_tracing_scenarios;` + +**D6. Validate stage D.** + +Same validation commands. All 6 BDD scenarios pass alongside existing tests. + +### Stage E: documentation and cleanup + +**E1. Update `docs/wireframe-client-design.md`.** + +Add "Tracing instrumentation" section after "Request hooks" documenting: span +names and levels per operation, structured fields table, `ResponseStream` +per-frame events, per-command timing mechanism, and design rationale for +`Span::enter()` in async context and `dynamic_span!` macro. + +Add a row to the client configuration reference table: + +```plaintext +| Tracing config | tracing_config(TracingConfig) | +| INFO connect/close, DEBUG data ops, timing off | +| Customize tracing span levels and per-command timing. | +``` + +**E2. Update `docs/users-guide.md`.** + +Add "Client tracing" subsection with builder API examples, `TracingConfig` +usage, span output examples, and per-command timing example. + +**E3. Mark roadmap item done.** + +In `docs/roadmap.md`, change `- [ ] 11.1.2.` to `- [x] 11.1.2.`. + +**E4. Final validation.** + +```bash +set -o pipefail +make fmt 2>&1 | tee /tmp/wireframe-fmt.log; echo "fmt exit: $?" +make markdownlint 2>&1 | tee /tmp/wireframe-mdlint.log; echo "mdlint exit: $?" +make check-fmt 2>&1 | tee /tmp/wireframe-check-fmt.log; echo "check-fmt exit: $?" +make lint 2>&1 | tee /tmp/wireframe-lint.log; echo "lint exit: $?" +make test 2>&1 | tee /tmp/wireframe-test.log; echo "test exit: $?" +``` + +## Concrete steps + +All commands run from the repository root. + +After each stage, run validation: + +```bash +set -o pipefail +make check-fmt 2>&1 | tee /tmp/wireframe-check-fmt.log; echo "check-fmt exit: $?" +make lint 2>&1 | tee /tmp/wireframe-lint.log; echo "lint exit: $?" +make test 2>&1 | tee /tmp/wireframe-test.log; echo "test exit: $?" +``` + +Expected: all three exit code 0. + +## Validation and acceptance + +Quality criteria (what "done" means): + +- `make test` passes. New unit tests in `src/client/tests/tracing.rs` and + BDD scenarios in `tests/scenarios/client_tracing_scenarios.rs` all pass. +- `make lint` passes with zero warnings. +- `make check-fmt` passes. +- `make markdownlint` passes. +- All pre-existing client tests pass unchanged (backwards compatibility). +- The `TracingConfig` type and `tracing_config()` builder method are public. +- The `frame_count()` method on `ResponseStream` is public. +- `docs/users-guide.md` documents the new builder methods and `TracingConfig`. +- `docs/wireframe-client-design.md` records the design decisions. +- `docs/roadmap.md` marks 11.1.2 as done. + +## Idempotence and recovery + +All stages are additive. If any stage fails partway, `git stash` or +`git checkout .` reverts to the last known-good state. Re-run from the +beginning of the failed stage. No destructive operations are involved. + +## Artifacts and notes + +**Span hierarchy example — `call()`:** + +```plaintext +client.call [DEBUG] + client.send{frame.bytes=42} [DEBUG] + client.receive{frame.bytes=42} [DEBUG, result=ok] +``` + +**Span hierarchy example — `call_correlated()`:** + +```plaintext +client.call_correlated{correlation_id=1} [DEBUG] + client.send_envelope{correlation_id=1, frame.bytes=42} [DEBUG] + client.receive{frame.bytes=42, result=ok} [DEBUG] +``` + +**Streaming events example:** + +```plaintext +client.call_streaming{correlation_id=1, frame.bytes=42} [DEBUG] + DEBUG stream frame received frame.bytes=38 stream.frames_received=1 correlation_id=1 + DEBUG stream frame received frame.bytes=38 stream.frames_received=2 correlation_id=1 + DEBUG stream terminated stream.frames_total=2 correlation_id=1 +``` + +**Files to create (8 new):** + +| File | Purpose | Est. lines | +| --------------------------------------------- | ------------------------------------------------------ | ---------- | +| `src/client/tracing_config.rs` | `TracingConfig` struct with 14 setter methods | ~115 | +| `src/client/tracing_helpers.rs` | `dynamic_span!` macro, 8 span factories, timing helper | ~130 | +| `src/client/builder/tracing.rs` | `tracing_config()` builder method | ~55 | +| `src/client/tests/tracing.rs` | 15 unit tests (rstest + `tracing-test`) | ~280 | +| `tests/features/client_tracing.feature` | BDD feature file (6 scenarios) | ~50 | +| `tests/fixtures/client_tracing.rs` | BDD world/fixture struct + methods | ~220 | +| `tests/steps/client_tracing_steps.rs` | BDD step definitions | ~110 | +| `tests/scenarios/client_tracing_scenarios.rs` | BDD scenario bindings | ~55 | + +**Files to modify (15):** + +| File | Current lines | Est. new lines | Change | +| --------------------------------- | ------------- | -------------- | --------------------------------------------------------------- | +| `src/client/runtime.rs` | 354 | ~380 | `tracing_config` field, spans in `send`, `call`, `close` | +| `src/client/messaging.rs` | 284 | ~330 | Spans in `send_envelope`, `receive_internal`, `call_correlated` | +| `src/client/streaming.rs` | 150 | ~178 | Span in `call_streaming` | +| `src/client/response_stream.rs` | 168 | ~195 | `frame_count` field, events in `poll_next`/`process_frame` | +| `src/client/builder/core.rs` | 63 | ~67 | `tracing_config` field | +| `src/client/builder/mod.rs` | 65 | ~75 | `tracing_config` in 3 macro arms + `mod tracing` | +| `src/client/builder/connect.rs` | 101 | ~120 | Connect span + timing + thread field | +| `src/client/mod.rs` | 82 | ~86 | Module declarations + re-export | +| `src/client/tests/mod.rs` | 175 | ~176 | `mod tracing;` | +| `tests/fixtures/mod.rs` | ~34 | ~35 | `pub mod client_tracing;` | +| `tests/steps/mod.rs` | ~33 | ~34 | `mod client_tracing_steps;` | +| `tests/scenarios/mod.rs` | ~36 | ~37 | `mod client_tracing_scenarios;` | +| `docs/wireframe-client-design.md` | 369 | ~400 | "Tracing instrumentation" section | +| `docs/users-guide.md` | 1926 | ~1975 | "Client tracing" subsection | +| `docs/roadmap.md` | ~500 | ~500 | Mark 11.1.2 done | + +Total: 8 new + 15 modified = 23 files. All source files under 400 lines. + +**Line-budget safety (tightest files):** + +| File | Current | Added | New total | Headroom to 400 | +| -------------------- | ------- | ----- | --------- | --------------- | +| `runtime.rs` | 354 | ~26 | ~380 | 20 | +| `messaging.rs` | 284 | ~46 | ~330 | 70 | +| `response_stream.rs` | 168 | ~27 | ~195 | 205 | +| `builder/connect.rs` | 101 | ~19 | ~120 | 280 | + +## Interfaces and dependencies + +No new external crate dependencies. + +### Types to define + +In `src/client/tracing_config.rs`: + +```rust +/// Controls tracing span levels and per-command timing for client operations. +#[derive(Clone, Debug)] +pub struct TracingConfig { + pub(crate) connect_level: Level, + pub(crate) send_level: Level, + pub(crate) receive_level: Level, + pub(crate) call_level: Level, + pub(crate) streaming_level: Level, + pub(crate) close_level: Level, + pub(crate) connect_timing: bool, + pub(crate) send_timing: bool, + pub(crate) receive_timing: bool, + pub(crate) call_timing: bool, + pub(crate) streaming_timing: bool, + pub(crate) close_timing: bool, +} +``` + +### Builder method to define + +In `src/client/builder/tracing.rs`: + +```rust +impl WireframeClientBuilder +where + S: Serializer + Send + Sync, +{ + pub fn tracing_config(mut self, config: TracingConfig) -> Self; +} +``` + +### Helper functions to define + +In `src/client/tracing_helpers.rs`: + +```rust +pub(crate) fn connect_span(config: &TracingConfig, peer_addr: &str) -> Span; +pub(crate) fn send_span(config: &TracingConfig, frame_bytes: usize) -> Span; +pub(crate) fn receive_span(config: &TracingConfig) -> Span; +pub(crate) fn send_envelope_span(config: &TracingConfig, correlation_id: u64, frame_bytes: usize) -> Span; +pub(crate) fn call_span(config: &TracingConfig) -> Span; +pub(crate) fn call_correlated_span(config: &TracingConfig) -> Span; +pub(crate) fn streaming_span(config: &TracingConfig, correlation_id: u64) -> Span; +pub(crate) fn close_span(config: &TracingConfig) -> Span; +pub(crate) fn emit_timing_event(start: Option); +``` + +### Public accessor to define + +In `src/client/response_stream.rs`: + +```rust +impl ResponseStream<'_, P, S, T, C> { + #[must_use] + pub fn frame_count(&self) -> usize; +} +``` diff --git a/docs/roadmap.md b/docs/roadmap.md index 3ecb8c9b..a77864ad 100644 --- a/docs/roadmap.md +++ b/docs/roadmap.md @@ -493,9 +493,9 @@ document so larger deployments can adopt the library confidently. - [x] 11.1.1. Add middleware hooks for outgoing requests and incoming frames so metrics, retries, and authentication tokens can be injected symmetrically with server middleware. -- [ ] 11.1.2. Provide structured logging and tracing spans around connect, - send, receive, and stream lifecycle events, plus configuration for - per-command timing. +- [x] 11.1.2. Provide structured logging and tracing spans around connect, + send, receive, call, stream, and close lifecycle events, plus configuration + for per-command timing. ### 11.2. Connection pooling and concurrency diff --git a/docs/users-guide.md b/docs/users-guide.md index 2c54f35b..b9751287 100644 --- a/docs/users-guide.md +++ b/docs/users-guide.md @@ -1140,6 +1140,7 @@ as `TCP_NODELAY` or buffer size adjustments. | Error hook | `on_error(...)` | Disabled | Transport and decode failures must be routed to observability. | | Before-send hook | `before_send(...)` | Disabled | Inspect or mutate serialized bytes before every outgoing frame. | | After-receive hook | `after_receive(...)` | Disabled | Inspect or mutate raw bytes after every incoming frame is read. | +| Tracing config | `tracing_config(TracingConfig)` | INFO connect/close, DEBUG data ops, timing off | Customize tracing span levels and per-command timing. | ```rust use std::{net::SocketAddr, time::Duration}; @@ -1423,6 +1424,45 @@ println!("sent {} frames", outcome.frames_sent()); `frames_sent()`. The error hook is invoked on all failure paths, consistent with other client send methods.[^53] +### Client tracing + +The client emits `tracing` spans around every operation. Span levels and +per-command timing are configurable via `TracingConfig`, which is passed to the +builder with `tracing_config()`. When no `tracing` subscriber is installed, all +instrumentation is zero-cost. + +Default span levels: `INFO` for lifecycle operations (`connect`, `close`) and +`DEBUG` for data operations (`send`, `receive`, `call`, `call_streaming`). +Per-command timing is disabled by default. + +```rust +use std::net::SocketAddr; + +use tracing::Level; +use wireframe::client::{TracingConfig, WireframeClient}; + +let addr: SocketAddr = "127.0.0.1:7878".parse().expect("valid socket address"); + +// Enable timing for connect and call, set all spans to TRACE level. +let config = TracingConfig::default() + .with_all_levels(Level::TRACE) + .with_connect_timing(true) + .with_call_timing(true); + +let mut client = WireframeClient::builder() + .tracing_config(config) + .connect(addr) + .await?; +``` + +When timing is enabled for an operation, a `DEBUG`-level event recording +`elapsed_us` is emitted when the operation completes (on both success and error +paths). Streaming responses emit per-frame `DEBUG` events with +`stream.frames_received` and a termination event with `stream.frames_total`. + +Clients configured without `tracing_config()` use the default configuration and +behave identically to the pre-tracing API. + ### Client message API with correlation identifiers The client provides envelope-aware messaging APIs that work with the `Packet` diff --git a/docs/wireframe-client-design.md b/docs/wireframe-client-design.md index 4e652087..067a54f7 100644 --- a/docs/wireframe-client-design.md +++ b/docs/wireframe-client-design.md @@ -67,6 +67,7 @@ length‑delimited codec. | Setup hook | `on_connection_setup(...)` | Disabled | Store per-connection state for metrics, auth context, or counters. | | Teardown hook | `on_connection_teardown(...)` | Disabled | Flush metrics and release per-connection resources on `close()`. | | Error hook | `on_error(...)` | Disabled | Centralize client transport/decode/correlation error reporting. | +| Tracing config | `tracing_config(TracingConfig)` | INFO connect/close, DEBUG data ops, timing off | Customize tracing span levels and per-command timing. | ### Request/response helpers @@ -241,6 +242,63 @@ can universally inspect or modify the serialized payload. `after_receive` is wired into `receive_internal()` (messaging.rs) and `ResponseStream::poll_next()` (response_stream.rs). +### Tracing instrumentation + +Every client operation is wrapped in a `tracing` span with structured fields. +Span levels are configurable per-operation via `TracingConfig`, with sensible +defaults: `INFO` for lifecycle operations (connect, close) and `DEBUG` for +high-frequency data operations (send, receive, call, streaming). When no +`tracing` subscriber is installed, all instrumentation is zero-cost. + +**Span hierarchy and field conventions:** + +| Operation | Span name | Structured fields | +| ------------------- | ------------------------ | ------------------------------------------ | +| `connect()` | `client.connect` | `peer.addr` | +| `send()` | `client.send` | `frame.bytes` | +| `receive()` | `client.receive` | `frame.bytes` (deferred), `result` | +| `send_envelope()` | `client.send_envelope` | `correlation_id`, `frame.bytes` | +| `call()` | `client.call` | `result` (deferred) | +| `call_correlated()` | `client.call_correlated` | `correlation_id` (deferred), `result` | +| `call_streaming()` | `client.call_streaming` | `correlation_id`, `frame.bytes` (deferred) | +| `close()` | `client.close` | (none) | + +**Per-frame streaming events**: `ResponseStream` emits `tracing::debug!` events +(not spans) on each received data frame and on stream termination: + +- `stream frame received` with `frame.bytes`, `stream.frames_received`, and + `correlation_id`. +- `stream terminated` with `stream.frames_total` and `correlation_id`. + +**Per-command timing**: When enabled via `TracingConfig::with_*_timing(true)`, +an additional `tracing::debug!` event recording `elapsed_us` is emitted when +the operation completes. Timing events fire on both success and error paths. +Timing is disabled by default for all operations. + +**Design rationale — async-safe span instrumentation**: `Span::enter()` guards +must not be held across `.await` points because a multi-threaded runtime may +poll the future on a different thread, causing the span to be "entered" on the +wrong thread. Client methods use `tracing::Instrument::instrument(span)` to +wrap async futures so the span is entered only while the future is polled and +exited between polls. For purely synchronous sections, +`Span::in_scope(|| { ... })` is the correct pattern. See the +[`tracing::Instrument`][instrument-docs] trait and the +[`#[tracing::instrument]`][attr-docs] attribute for further guidance. + +[instrument-docs]: https://docs.rs/tracing/latest/tracing/trait.Instrument.html +[attr-docs]: https://docs.rs/tracing/latest/tracing/attr.instrument.html + +**Design rationale — `dynamic_span!` macro**: The `tracing` crate requires +compile-time level constants in `span!` macros. To support user-configurable +levels per operation, a `macro_rules!` macro in `tracing_helpers.rs` matches on +the five `Level` variants, delegating to the corresponding +`tracing::_span!` macro per branch. Each branch has static metadata +while branch selection is dynamic. + +**Design rationale — `ResponseStream` events, not spans**: Creating spans +inside `poll_next` is problematic because it is synchronous and called many +times. Events are lightweight and appropriate for per-frame diagnostics. + ### Preamble support The client builder now supports an optional preamble exchange. Use diff --git a/src/client/builder/connect.rs b/src/client/builder/connect.rs index a476f87b..b30986ea 100644 --- a/src/client/builder/connect.rs +++ b/src/client/builder/connect.rs @@ -1,14 +1,20 @@ //! Connection establishment for `WireframeClientBuilder`. -use std::{net::SocketAddr, sync::atomic::AtomicU64}; +use std::{net::SocketAddr, sync::atomic::AtomicU64, time::Instant}; use bincode::Encode; use tokio::net::TcpSocket; use tokio_util::codec::Framed; +use tracing::Instrument; use super::WireframeClientBuilder; use crate::{ - client::{ClientError, WireframeClient, preamble_exchange::perform_preamble_exchange}, + client::{ + ClientError, + WireframeClient, + preamble_exchange::perform_preamble_exchange, + tracing_helpers::{connect_span, emit_timing_event}, + }, rewind_stream::RewindStream, serializer::Serializer, }; @@ -51,6 +57,23 @@ where pub async fn connect( self, addr: SocketAddr, + ) -> Result, C>, ClientError> { + let span = connect_span(&self.tracing_config, &addr.to_string()); + let timing_start = self.tracing_config.connect_timing.then(Instant::now); + + async { + let result = self.connect_inner(addr).await; + emit_timing_event(timing_start); + result + } + .instrument(span) + .await + } + + /// Perform socket creation, connection, preamble exchange, and codec setup. + async fn connect_inner( + self, + addr: SocketAddr, ) -> Result, C>, ClientError> { let socket = if addr.is_ipv4() { TcpSocket::new_v4()? @@ -95,6 +118,7 @@ where on_disconnect: self.lifecycle_hooks.on_disconnect, on_error: self.lifecycle_hooks.on_error, request_hooks: self.request_hooks, + tracing_config: self.tracing_config, correlation_counter: AtomicU64::new(1), }) } diff --git a/src/client/builder/core.rs b/src/client/builder/core.rs index af177792..c07bfca9 100644 --- a/src/client/builder/core.rs +++ b/src/client/builder/core.rs @@ -6,6 +6,7 @@ use crate::{ SocketOptions, hooks::{LifecycleHooks, RequestHooks}, preamble_exchange::PreambleConfig, + tracing_config::TracingConfig, }, serializer::BincodeSerializer, }; @@ -32,6 +33,7 @@ pub struct WireframeClientBuilder { pub(crate) preamble_config: Option>, pub(crate) lifecycle_hooks: LifecycleHooks, pub(crate) request_hooks: RequestHooks, + pub(crate) tracing_config: TracingConfig, } impl WireframeClientBuilder { @@ -54,6 +56,7 @@ impl WireframeClientBuilder { preamble_config: None, lifecycle_hooks: LifecycleHooks::default(), request_hooks: RequestHooks::default(), + tracing_config: TracingConfig::default(), } } } diff --git a/src/client/builder/mod.rs b/src/client/builder/mod.rs index ae55c18a..c6e0e365 100644 --- a/src/client/builder/mod.rs +++ b/src/client/builder/mod.rs @@ -28,6 +28,7 @@ macro_rules! builder_field_update { preamble_config: $self.preamble_config, lifecycle_hooks: $self.lifecycle_hooks, request_hooks: $self.request_hooks, + tracing_config: $self.tracing_config, } }; // Preamble change: preserves S and C, moves lifecycle_hooks unchanged @@ -39,6 +40,7 @@ macro_rules! builder_field_update { preamble_config: $value, lifecycle_hooks: $self.lifecycle_hooks, request_hooks: $self.request_hooks, + tracing_config: $self.tracing_config, } }; // Lifecycle hooks change: preserves S and P, changes C @@ -50,6 +52,7 @@ macro_rules! builder_field_update { preamble_config: $self.preamble_config, lifecycle_hooks: $value, request_hooks: $self.request_hooks, + tracing_config: $self.tracing_config, } }; } @@ -61,5 +64,6 @@ mod lifecycle; mod preamble; mod request_hooks; mod serializer; +mod tracing; pub use core::WireframeClientBuilder; diff --git a/src/client/builder/tracing.rs b/src/client/builder/tracing.rs new file mode 100644 index 00000000..7bb94e97 --- /dev/null +++ b/src/client/builder/tracing.rs @@ -0,0 +1,35 @@ +//! Tracing configuration builder method for [`WireframeClientBuilder`]. + +use super::WireframeClientBuilder; +use crate::{client::tracing_config::TracingConfig, serializer::Serializer}; + +impl WireframeClientBuilder +where + S: Serializer + Send + Sync, +{ + /// Configure tracing instrumentation for the client. + /// + /// The [`TracingConfig`] controls which operations emit tracing spans, + /// at what level, and whether per-command elapsed-time events are + /// recorded. + /// + /// When not called, the client uses [`TracingConfig::default()`], which + /// emits `INFO` spans for lifecycle operations (`connect`, `close`) and + /// `DEBUG` spans for data operations (`send`, `receive`, `call`, + /// `call_streaming`). Timing is disabled by default. + /// + /// # Examples + /// + /// ``` + /// use wireframe::client::{TracingConfig, WireframeClientBuilder}; + /// + /// let config = TracingConfig::default().with_all_timing(true); + /// let builder = WireframeClientBuilder::new().tracing_config(config); + /// let _ = builder; + /// ``` + #[must_use] + pub fn tracing_config(mut self, config: TracingConfig) -> Self { + self.tracing_config = config; + self + } +} diff --git a/src/client/messaging.rs b/src/client/messaging.rs index 931de2c5..70da00a7 100644 --- a/src/client/messaging.rs +++ b/src/client/messaging.rs @@ -3,12 +3,17 @@ //! This module provides methods for sending and receiving envelopes with //! automatic correlation ID generation and validation. -use std::sync::atomic::Ordering; +use std::{sync::atomic::Ordering, time::Instant}; use bytes::Bytes; use futures::{SinkExt, StreamExt}; +use tracing::Instrument; -use super::{ClientError, runtime::ClientStream}; +use super::{ + ClientError, + runtime::ClientStream, + tracing_helpers::{call_correlated_span, emit_timing_event, receive_span, send_envelope_span}, +}; use crate::{ app::Packet, message::{DecodeWith, EncodeWith}, @@ -122,16 +127,26 @@ where envelope.set_correlation_id(Some(correlation_id)); } + let timing_start = self.tracing_config.send_timing.then(Instant::now); let mut bytes = match self.serializer.serialize(&envelope) { Ok(bytes) => bytes, Err(e) => { let err = ClientError::Serialize(e); + emit_timing_event(timing_start); self.invoke_error_hook(&err).await; return Err(err); } }; self.invoke_before_send_hooks(&mut bytes); - if let Err(e) = self.framed.send(Bytes::from(bytes)).await { + let span = send_envelope_span(&self.tracing_config, correlation_id, bytes.len()); + let send_result = async { + let result = self.framed.send(Bytes::from(bytes)).await; + emit_timing_event(timing_start); + result + } + .instrument(span) + .await; + if let Err(e) = send_result { let err = ClientError::from(e); self.invoke_error_hook(&err).await; return Err(err); @@ -217,8 +232,43 @@ where where P: Packet + EncodeWith + DecodeWith, { - let correlation_id = self.send_envelope(request).await?; - let response: P = self.receive_envelope().await?; + let span = call_correlated_span(&self.tracing_config); + let timing_start = self.tracing_config.call_timing.then(Instant::now); + + self.call_correlated_inner(request, &span, timing_start) + .instrument(span.clone()) + .await + } + + /// Execute the correlated call within an active span. + async fn call_correlated_inner

( + &mut self, + request: P, + span: &tracing::Span, + timing_start: Option, + ) -> Result + where + P: Packet + EncodeWith + DecodeWith, + { + let correlation_id = match self.send_envelope(request).await { + Ok(id) => id, + Err(err) => { + // Error hook already invoked by send_envelope. + span.record("result", "err"); + emit_timing_event(timing_start); + return Err(err); + } + }; + span.record("correlation_id", correlation_id); + let response: P = match self.receive_envelope().await { + Ok(response) => response, + Err(err) => { + // Error hook already invoked by receive_internal. + span.record("result", "err"); + emit_timing_event(timing_start); + return Err(err); + } + }; // Validate correlation ID matches. let response_correlation_id = response.correlation_id(); @@ -227,37 +277,50 @@ where expected: Some(correlation_id), received: response_correlation_id, }; - self.invoke_error_hook(&err).await; - return Err(err); + return Err(self.traced_error(span, timing_start, err).await); } + Self::traced_ok(span, timing_start); Ok(response) } /// Internal helper for receiving and deserializing a frame. pub(crate) async fn receive_internal>(&mut self) -> Result { + let span = receive_span(&self.tracing_config); + let timing_start = self.tracing_config.receive_timing.then(Instant::now); + + self.receive_frame(&span, timing_start) + .instrument(span.clone()) + .await + } + + /// Receive and deserialize a single frame within an active span. + async fn receive_frame>( + &mut self, + span: &tracing::Span, + timing_start: Option, + ) -> Result { let Some(frame) = self.framed.next().await else { let err = ClientError::disconnected(); - self.invoke_error_hook(&err).await; - return Err(err); + return Err(self.traced_error(span, timing_start, err).await); }; let mut bytes = match frame { Ok(bytes) => bytes, Err(e) => { let err = ClientError::from(e); - self.invoke_error_hook(&err).await; - return Err(err); + return Err(self.traced_error(span, timing_start, err).await); } }; + span.record("frame.bytes", bytes.len()); self.invoke_after_receive_hooks(&mut bytes); let (message, _consumed) = match self.serializer.deserialize(&bytes) { Ok(result) => result, Err(e) => { let err = ClientError::decode(e); - self.invoke_error_hook(&err).await; - return Err(err); + return Err(self.traced_error(span, timing_start, err).await); } }; + Self::traced_ok(span, timing_start); Ok(message) } @@ -268,6 +331,26 @@ where } } + /// Record a span result, emit timing, invoke the error hook, and return + /// the error for early-return paths. + pub(crate) async fn traced_error( + &self, + span: &tracing::Span, + timing_start: Option, + err: ClientError, + ) -> ClientError { + span.record("result", "err"); + emit_timing_event(timing_start); + self.invoke_error_hook(&err).await; + err + } + + /// Record a successful span result and emit timing. + pub(crate) fn traced_ok(span: &tracing::Span, timing_start: Option) { + span.record("result", "ok"); + emit_timing_event(timing_start); + } + /// Invoke all registered before-send hooks in registration order. pub(crate) fn invoke_before_send_hooks(&self, bytes: &mut Vec) { for hook in &self.request_hooks.before_send { diff --git a/src/client/mod.rs b/src/client/mod.rs index c00fb959..d7d3ec73 100644 --- a/src/client/mod.rs +++ b/src/client/mod.rs @@ -23,6 +23,8 @@ mod runtime; mod send_streaming; mod socket_option_methods; mod streaming; +mod tracing_config; +mod tracing_helpers; pub use builder::WireframeClientBuilder; pub use codec_config::ClientCodecConfig; @@ -38,6 +40,7 @@ pub use hooks::{ pub use response_stream::ResponseStream; pub use runtime::WireframeClient; pub use send_streaming::{SendStreamingConfig, SendStreamingOutcome}; +pub use tracing_config::TracingConfig; /// Handler invoked after the client successfully writes its preamble. /// diff --git a/src/client/response_stream.rs b/src/client/response_stream.rs index 8d707d46..217f29c3 100644 --- a/src/client/response_stream.rs +++ b/src/client/response_stream.rs @@ -67,6 +67,7 @@ where client: &'a mut super::WireframeClient, correlation_id: u64, terminated: bool, + frame_count: usize, _phantom: PhantomData P>, } @@ -89,6 +90,7 @@ where client, correlation_id, terminated: false, + frame_count: 0, _phantom: PhantomData, } } @@ -101,6 +103,24 @@ where #[must_use] pub fn is_terminated(&self) -> bool { self.terminated } + /// Returns the number of data frames received so far. + #[must_use] + pub fn frame_count(&self) -> usize { self.frame_count } + + /// Increment the frame counter and emit a per-frame tracing event for + /// successfully decoded data frames. + fn on_frame_received(&mut self, frame_bytes: usize, result: Option<&Result>) { + if let Some(Ok(_)) = result { + self.frame_count = self.frame_count.saturating_add(1); + tracing::debug!( + frame.bytes = frame_bytes, + stream.frames_received = self.frame_count, + correlation_id = self.correlation_id, + "stream frame received" + ); + } + } + /// Deserialize raw bytes and validate the resulting packet against the /// terminator predicate and expected correlation identifier. fn process_frame(&mut self, bytes: &[u8]) -> Option> { @@ -116,6 +136,11 @@ where // without a per-request correlation stamp are handled cleanly. if packet.is_stream_terminator() { self.terminated = true; + tracing::debug!( + stream.frames_total = self.frame_count, + correlation_id = self.correlation_id, + "stream terminated" + ); return None; } @@ -160,8 +185,11 @@ where Poll::Ready(Some(Err(ClientError::from(e)))) } Poll::Ready(Some(Ok(mut bytes))) => { + let frame_bytes = bytes.len(); this.client.invoke_after_receive_hooks(&mut bytes); - Poll::Ready(this.process_frame(&bytes)) + let result = this.process_frame(&bytes); + this.on_frame_received(frame_bytes, result.as_ref()); + Poll::Ready(result) } } } diff --git a/src/client/runtime.rs b/src/client/runtime.rs index 59843023..8509e5a0 100644 --- a/src/client/runtime.rs +++ b/src/client/runtime.rs @@ -1,6 +1,6 @@ //! Wireframe client runtime implementation. -use std::{fmt, sync::atomic::AtomicU64}; +use std::{fmt, sync::atomic::AtomicU64, time::Instant}; use bytes::Bytes; use futures::SinkExt; @@ -9,12 +9,15 @@ use tokio::{ net::TcpStream, }; use tokio_util::codec::{Framed, LengthDelimitedCodec}; +use tracing::Instrument; use super::{ ClientCodecConfig, ClientError, WireframeClientBuilder, hooks::{ClientConnectionTeardownHandler, ClientErrorHandler, RequestHooks}, + tracing_config::TracingConfig, + tracing_helpers::{call_span, close_span, emit_timing_event, send_span}, }; use crate::{ message::{DecodeWith, EncodeWith}, @@ -66,6 +69,8 @@ where pub(crate) on_error: Option, /// Hooks invoked on every outgoing and incoming frame. pub(crate) request_hooks: RequestHooks, + /// Tracing configuration for span levels and per-command timing. + pub(crate) tracing_config: TracingConfig, /// Counter for generating unique correlation identifiers. pub(crate) correlation_counter: AtomicU64, } @@ -139,16 +144,26 @@ where /// # } /// ``` pub async fn send>(&mut self, message: &M) -> Result<(), ClientError> { + let timing_start = self.tracing_config.send_timing.then(Instant::now); let mut bytes = match self.serializer.serialize(message) { Ok(bytes) => bytes, Err(e) => { let err = ClientError::Serialize(e); + emit_timing_event(timing_start); self.invoke_error_hook(&err).await; return Err(err); } }; self.invoke_before_send_hooks(&mut bytes); - if let Err(e) = self.framed.send(Bytes::from(bytes)).await { + let span = send_span(&self.tracing_config, bytes.len()); + let send_result = async { + let result = self.framed.send(Bytes::from(bytes)).await; + emit_timing_event(timing_start); + result + } + .instrument(span) + .await; + if let Err(e) = send_result { let err = ClientError::from(e); self.invoke_error_hook(&err).await; return Err(err); @@ -232,8 +247,28 @@ where &mut self, request: &Req, ) -> Result { - self.send(request).await?; - self.receive().await + let span = call_span(&self.tracing_config); + let timing_start = self.tracing_config.call_timing.then(Instant::now); + + async { + if let Err(err) = self.send(request).await { + // Error hook already invoked by send. + span.record("result", "err"); + emit_timing_event(timing_start); + return Err(err); + } + let result = self.receive().await; + if result.is_ok() { + Self::traced_ok(&span, timing_start); + } else { + // Error hook already invoked by receive_internal. + span.record("result", "err"); + emit_timing_event(timing_start); + } + result + } + .instrument(span.clone()) + .await } /// Inspect the configured codec settings. @@ -343,12 +378,22 @@ where /// # } /// ``` pub async fn close(mut self) { - // Flush pending frames and send EOF before teardown. - // Ignore errors since we're closing anyway. - let _ = self.framed.close().await; + let span = close_span(&self.tracing_config); + let timing_start = self.tracing_config.close_timing.then(Instant::now); - if let (Some(state), Some(handler)) = (self.connection_state.take(), &self.on_disconnect) { - handler(state).await; + async { + // Flush pending frames and send EOF before teardown. + // Ignore errors since we're closing anyway. + let _ = self.framed.close().await; + + if let (Some(state), Some(handler)) = + (self.connection_state.take(), &self.on_disconnect) + { + handler(state).await; + } + emit_timing_event(timing_start); } + .instrument(span) + .await; } } diff --git a/src/client/streaming.rs b/src/client/streaming.rs index d8277f64..66e74db5 100644 --- a/src/client/streaming.rs +++ b/src/client/streaming.rs @@ -5,12 +5,18 @@ //! [`ResponseStream`](super::ResponseStream) that yields data frames until the //! protocol's end-of-stream terminator arrives. -use std::sync::atomic::Ordering; +use std::{sync::atomic::Ordering, time::Instant}; use bytes::Bytes; use futures::SinkExt; +use tracing::Instrument; -use super::{ClientError, ResponseStream, runtime::ClientStream}; +use super::{ + ClientError, + ResponseStream, + runtime::ClientStream, + tracing_helpers::{emit_timing_event, streaming_span}, +}; use crate::{ app::Packet, message::{DecodeWith, EncodeWith}, @@ -85,16 +91,28 @@ where request.set_correlation_id(Some(correlation_id)); } + let span = streaming_span(&self.tracing_config, correlation_id); + let timing_start = self.tracing_config.streaming_timing.then(Instant::now); + let mut bytes = match self.serializer.serialize(&request) { Ok(bytes) => bytes, Err(e) => { let err = ClientError::Serialize(e); + emit_timing_event(timing_start); self.invoke_error_hook(&err).await; return Err(err); } }; self.invoke_before_send_hooks(&mut bytes); - if let Err(e) = self.framed.send(Bytes::from(bytes)).await { + span.record("frame.bytes", bytes.len()); + let send_result = async { + let result = self.framed.send(Bytes::from(bytes)).await; + emit_timing_event(timing_start); + result + } + .instrument(span) + .await; + if let Err(e) = send_result { let err = ClientError::from(e); self.invoke_error_hook(&err).await; return Err(err); diff --git a/src/client/tests/mod.rs b/src/client/tests/mod.rs index 60c25bc0..9d519d39 100644 --- a/src/client/tests/mod.rs +++ b/src/client/tests/mod.rs @@ -10,6 +10,8 @@ mod send_streaming_infra; mod streaming; mod streaming_infra; mod streaming_parity; +mod tracing; +mod tracing_streaming; use std::time::Duration; diff --git a/src/client/tests/tracing.rs b/src/client/tests/tracing.rs new file mode 100644 index 00000000..a32ffdba --- /dev/null +++ b/src/client/tests/tracing.rs @@ -0,0 +1,361 @@ +//! Unit tests for client tracing spans and per-command timing. +//! +//! The `tracing-test` subscriber captures formatted event output; span names +//! appear as context prefixes. Tests enable per-command timing so an event is +//! emitted within each span, making the span name visible in captured output. + +use bytes::Bytes; +use futures::{SinkExt, StreamExt}; +use rstest::rstest; +use tokio::net::TcpListener; +use tokio_util::codec::{Framed, LengthDelimitedCodec}; +use tracing_test::traced_test; +use wireframe_testing::{ServerMode, process_frame}; + +use crate::{ + app::Envelope, + client::{ClientError, TracingConfig, WireframeClient}, + rewind_stream::RewindStream, + serializer::BincodeSerializer, +}; + +/// Concrete client type returned by `builder().connect()` in tests. +type TestClient = WireframeClient>; + +/// Spawn a test echo server that deserialises envelopes and echoes them back. +async fn spawn_echo_server() -> (std::net::SocketAddr, tokio::task::JoinHandle<()>) { + let listener = TcpListener::bind("127.0.0.1:0") + .await + .expect("bind listener"); + let addr = listener.local_addr().expect("listener addr"); + + let handle = tokio::spawn(async move { + let (stream, _) = listener.accept().await.expect("accept client"); + let mut framed = Framed::new(stream, LengthDelimitedCodec::new()); + + while let Some(Ok(bytes)) = framed.next().await { + let Some(response_bytes) = process_frame(ServerMode::Echo, &bytes) else { + break; + }; + if framed.send(Bytes::from(response_bytes)).await.is_err() { + break; + } + } + }); + + (addr, handle) +} + +/// Spawn an echo server, connect a client with the given tracing config, +/// run an async closure against it, then tear down the server. +/// +/// The closure takes ownership of the client so that operations such as +/// `close()` (which consumes `self`) work without lifetime gymnastics. +async fn with_echo_client(config: TracingConfig, f: F) +where + F: FnOnce(TestClient, std::net::SocketAddr) -> Fut, + Fut: std::future::Future, +{ + let (addr, server) = spawn_echo_server().await; + let client = WireframeClient::builder() + .tracing_config(config) + .connect(addr) + .await + .expect("connect"); + f(client, addr).await; + server.abort(); +} + +/// Return a closure that asserts at least one log line contains +/// `span_name` and every string in `required_fields`. Pass the +/// returned closure to `logs_assert`. +pub(super) fn span_assertion( + span_name: &str, + required_fields: &[&str], +) -> impl Fn(&[&str]) -> Result<(), String> + 'static { + let span = span_name.to_owned(); + let fields: Vec = required_fields.iter().map(|s| (*s).to_owned()).collect(); + move |lines: &[&str]| { + lines + .iter() + .find(|line| line.contains(&span) && fields.iter().all(|f| line.contains(f.as_str()))) + .map(|_| ()) + .ok_or_else(|| format!("{span} not found in:\n{}", lines.join("\n"))) + } +} + +/// Run a test operation against an echo client and assert the expected +/// span appears in logs with the given required fields. +/// +/// This is a macro rather than a function because `logs_assert` is a +/// scope-bound local injected by `#[traced_test]` into each test body. +macro_rules! test_span_emission { + ($config:expr, $span_name:expr, $required_fields:expr, $operation:expr $(,)?) => { + with_echo_client($config, $operation).await; + logs_assert(span_assertion($span_name, $required_fields)); + }; +} + +#[rstest] +#[traced_test] +#[tokio::test] +async fn connect_emits_span_with_peer_address() { + let captured_addr = std::sync::OnceLock::new(); + with_echo_client( + TracingConfig::default().with_connect_timing(true), + |_client, addr| { + captured_addr.set(addr.to_string()).expect("set addr"); + async {} + }, + ) + .await; + + // The peer address is dynamic, so we capture it from the closure. + let addr_str = captured_addr.get().expect("addr captured"); + logs_assert(span_assertion("client.connect", &[addr_str])); +} + +#[rstest] +#[traced_test] +#[tokio::test] +async fn send_emits_span_with_frame_bytes() { + test_span_emission!( + TracingConfig::default().with_send_timing(true), + "client.send", + &["frame.bytes"], + |mut client, _addr| async move { + let envelope = Envelope::new(1, None, vec![1, 2, 3]); + client.send(&envelope).await.expect("send"); + }, + ); +} + +#[rstest] +#[traced_test] +#[tokio::test] +async fn receive_emits_span_with_result() { + test_span_emission!( + TracingConfig::default().with_receive_timing(true), + "client.receive", + &["result=\"ok\""], + |mut client, _addr| async move { + let envelope = Envelope::new(1, None, vec![1, 2, 3]); + client.send(&envelope).await.expect("send"); + let _response: Envelope = client.receive().await.expect("receive"); + }, + ); +} + +#[rstest] +#[traced_test] +#[tokio::test] +async fn call_emits_wrapping_span() { + test_span_emission!( + TracingConfig::default().with_call_timing(true), + "client.call", + &[], + |mut client, _addr| async move { + let envelope = Envelope::new(1, None, vec![1, 2, 3]); + let _response: Envelope = client.call(&envelope).await.expect("call"); + }, + ); +} + +#[rstest] +#[traced_test] +#[tokio::test] +async fn call_correlated_emits_span_with_correlation_id() { + test_span_emission!( + TracingConfig::default().with_call_timing(true), + "client.call_correlated", + &["correlation_id"], + |mut client, _addr| async move { + let request = Envelope::new(1, None, vec![1, 2, 3]); + let _response: Envelope = client + .call_correlated(request) + .await + .expect("call_correlated"); + }, + ); +} + +#[rstest] +#[traced_test] +#[tokio::test] +async fn send_envelope_emits_span_with_correlation_id_and_frame_bytes() { + test_span_emission!( + TracingConfig::default().with_send_timing(true), + "client.send_envelope", + &["correlation_id", "frame.bytes"], + |mut client, _addr| async move { + let envelope = Envelope::new(1, None, vec![1, 2, 3]); + let _cid = client.send_envelope(envelope).await.expect("send_envelope"); + }, + ); +} + +#[rstest] +#[traced_test] +#[tokio::test] +async fn close_emits_span() { + with_echo_client( + TracingConfig::default().with_close_timing(true), + |client, _addr| async move { + client.close().await; + }, + ) + .await; + + logs_assert(span_assertion("client.close", &[])); +} + +#[rstest] +#[traced_test] +#[tokio::test] +async fn call_correlated_error_records_result_err_and_emits_timing() { + let listener = TcpListener::bind("127.0.0.1:0") + .await + .expect("bind listener"); + let addr = listener.local_addr().expect("listener addr"); + + // Accept one frame then drop the connection so that the correlated + // receive fails with a disconnect error. + let accept = tokio::spawn(async move { + let (stream, _) = listener.accept().await.expect("accept"); + let mut framed = Framed::new(stream, LengthDelimitedCodec::new()); + let _frame = framed.next().await; + drop(framed); + }); + + let mut client = WireframeClient::builder() + .tracing_config(TracingConfig::default().with_call_timing(true)) + .connect(addr) + .await + .expect("connect"); + + let request = Envelope::new(1, None, vec![1, 2, 3]); + let result: Result = client.call_correlated(request).await; + assert!( + result.is_err(), + "call_correlated should fail after disconnect" + ); + + accept.await.expect("join accept"); + + // The span must appear with result="err" and timing must be emitted. + logs_assert(span_assertion( + "client.call_correlated", + &["result=\"err\"", "elapsed_us"], + )); +} + +#[rstest] +#[traced_test] +#[tokio::test] +async fn receive_error_records_result_err() { + let listener = TcpListener::bind("127.0.0.1:0") + .await + .expect("bind listener"); + let addr = listener.local_addr().expect("listener addr"); + + let accept = tokio::spawn(async move { + let (stream, _) = listener.accept().await.expect("accept"); + drop(stream); + }); + + let mut client = WireframeClient::builder() + .tracing_config(TracingConfig::default().with_receive_timing(true)) + .connect(addr) + .await + .expect("connect"); + + accept.await.expect("join accept"); + + let result: Result = client.receive().await; + assert!(result.is_err(), "receive should fail after disconnect"); + + logs_assert(span_assertion("client.receive", &["result=\"err\""])); +} + +#[rstest] +#[traced_test] +#[tokio::test] +async fn timing_disabled_by_default() { + with_echo_client(TracingConfig::default(), |mut client, _addr| async move { + let envelope = Envelope::new(1, None, vec![1, 2, 3]); + let _response: Envelope = client.call(&envelope).await.expect("call"); + }) + .await; + + assert!( + !logs_contain("elapsed_us"), + "elapsed_us should not appear when timing is disabled" + ); +} + +#[rstest] +#[traced_test] +#[tokio::test] +async fn timing_enabled_emits_elapsed_us_for_send() { + test_span_emission!( + TracingConfig::default().with_send_timing(true), + "elapsed_us", + &[], + |mut client, _addr| async move { + let envelope = Envelope::new(1, None, vec![1, 2, 3]); + client.send(&envelope).await.expect("send"); + }, + ); +} + +#[rstest] +#[traced_test] +#[tokio::test] +async fn timing_enabled_for_connect() { + test_span_emission!( + TracingConfig::default().with_connect_timing(true), + "elapsed_us", + &[], + |_client, _addr| async {}, + ); +} + +#[rstest] +#[traced_test] +#[tokio::test] +async fn all_timing_convenience_enables_all_operations() { + with_echo_client( + TracingConfig::default().with_all_timing(true), + |mut client, _addr| async move { + let envelope = Envelope::new(1, None, vec![1, 2, 3]); + let _response: Envelope = client.call(&envelope).await.expect("call"); + }, + ) + .await; + + // At minimum: connect + send + receive + call = 4 timing events. + logs_assert(|lines: &[&str]| { + let count = lines.iter().filter(|l| l.contains("elapsed_us")).count(); + if count >= 4 { + Ok(()) + } else { + Err(format!("expected >=4 elapsed_us events, found {count}")) + } + }); +} + +#[rstest] +#[traced_test] +#[tokio::test] +async fn default_config_is_backwards_compatible() { + // No tracing_config() call — uses the default. Verifies no panic + // occurs and basic operations succeed with default configuration. + let (addr, server) = spawn_echo_server().await; + let mut client = WireframeClient::builder() + .connect(addr) + .await + .expect("connect"); + let envelope = Envelope::new(1, None, vec![1, 2, 3]); + let _response: Envelope = client.call(&envelope).await.expect("call"); + server.abort(); +} diff --git a/src/client/tests/tracing_streaming.rs b/src/client/tests/tracing_streaming.rs new file mode 100644 index 00000000..a081e0dc --- /dev/null +++ b/src/client/tests/tracing_streaming.rs @@ -0,0 +1,104 @@ +//! Tracing tests for streaming client operations. +//! +//! These tests verify that `call_streaming` and the response stream emit the +//! expected tracing spans and events. They are separated from the core tracing +//! tests because they depend on the streaming test infrastructure. + +use futures::StreamExt; +use rstest::rstest; +use tracing_test::traced_test; + +use super::{ + streaming_infra::{ + CorrelationId, + MessageId, + Payload, + TestStreamEnvelope, + setup_streaming_test, + spawn_test_server, + }, + tracing::span_assertion, +}; +use crate::client::{TracingConfig, WireframeClient}; + +#[rstest] +#[traced_test] +#[tokio::test] +async fn call_streaming_emits_span_with_correlation_id() { + let cid = CorrelationId::new(42); + let frames = vec![ + TestStreamEnvelope::data(MessageId::new(1), cid, Payload::new(vec![10])), + TestStreamEnvelope::terminator(cid), + ]; + + let server = spawn_test_server(frames, false).await.expect("server"); + let mut client = WireframeClient::builder() + .tracing_config(TracingConfig::default().with_streaming_timing(true)) + .connect(server.addr) + .await + .expect("connect"); + + let request = TestStreamEnvelope::data(MessageId::new(99), cid, Payload::new(vec![])); + let mut stream = client + .call_streaming::(request) + .await + .expect("call_streaming"); + + // Consume the stream — frame events carry the call_streaming span context. + while let Some(result) = stream.next().await { + let _frame = result.expect("data frame"); + } + + // With streaming timing enabled, the elapsed_us event fires within + // the client.call_streaming span, making the span name visible. + logs_assert(span_assertion("client.call_streaming", &["correlation_id"])); +} + +#[rstest] +#[traced_test] +#[tokio::test] +async fn response_stream_emits_frame_events() { + let cid = CorrelationId::new(42); + let frames = vec![ + TestStreamEnvelope::data(MessageId::new(1), cid, Payload::new(vec![10])), + TestStreamEnvelope::data(MessageId::new(2), cid, Payload::new(vec![20])), + TestStreamEnvelope::data(MessageId::new(3), cid, Payload::new(vec![30])), + TestStreamEnvelope::terminator(cid), + ]; + + let (mut client, _server) = setup_streaming_test(frames).await.expect("setup"); + + let request = TestStreamEnvelope::data(MessageId::new(99), cid, Payload::new(vec![])); + let mut stream = client + .call_streaming::(request) + .await + .expect("call_streaming"); + + while let Some(result) = stream.next().await { + let _frame = result.expect("data frame"); + } + + // Only data frames increment the counter; the terminator does not. + assert_eq!( + stream.frame_count(), + 3, + "frame_count should only include successfully decoded data frames" + ); + + logs_assert(|lines: &[&str]| { + let frame_events = lines + .iter() + .filter(|line| line.contains("stream frame received")) + .count(); + if frame_events < 3 { + return Err(format!( + "expected at least 3 'stream frame received' events, found {frame_events}" + )); + } + lines + .iter() + .find(|line| line.contains("stream terminated")) + .map(|_| ()) + .ok_or_else(|| "'stream terminated' event not found".to_string()) + }); +} diff --git a/src/client/tracing_config.rs b/src/client/tracing_config.rs new file mode 100644 index 00000000..568c0e45 --- /dev/null +++ b/src/client/tracing_config.rs @@ -0,0 +1,375 @@ +//! Tracing configuration for wireframe client operations. +//! +//! [`TracingConfig`] controls which client operations emit tracing spans and +//! whether per-command elapsed-time events are recorded. + +use tracing::Level; + +/// Controls tracing span levels and per-command timing for client operations. +/// +/// By default, lifecycle operations (`connect`, `close`) emit spans at +/// `INFO` level. High-frequency operations (`send`, `receive`, `call`, +/// `call_streaming`) emit spans at `DEBUG` level. Per-command timing is +/// disabled for all operations by default. +/// +/// Spans are always created at the configured level. When no `tracing` +/// subscriber is installed, span creation is a no-op (zero-cost). When +/// per-command timing is enabled for an operation, an additional event +/// recording `elapsed_us` is emitted when the operation completes. +/// +/// # Examples +/// +/// ``` +/// use tracing::Level; +/// use wireframe::client::TracingConfig; +/// +/// // Enable timing for connect and call operations only. +/// let config = TracingConfig::default() +/// .with_connect_timing(true) +/// .with_call_timing(true); +/// let _ = config; +/// +/// // Set all operations to TRACE level. +/// let verbose = TracingConfig::default() +/// .with_all_levels(Level::TRACE) +/// .with_all_timing(true); +/// let _ = verbose; +/// ``` +#[expect( + clippy::struct_excessive_bools, + reason = "six independent on/off timing flags — one per operation category" +)] +#[derive(Clone, Debug)] +pub struct TracingConfig { + pub(crate) connect_level: Level, + pub(crate) send_level: Level, + pub(crate) receive_level: Level, + pub(crate) call_level: Level, + pub(crate) streaming_level: Level, + pub(crate) close_level: Level, + pub(crate) connect_timing: bool, + pub(crate) send_timing: bool, + pub(crate) receive_timing: bool, + pub(crate) call_timing: bool, + pub(crate) streaming_timing: bool, + pub(crate) close_timing: bool, +} + +impl Default for TracingConfig { + fn default() -> Self { + Self { + connect_level: Level::INFO, + send_level: Level::DEBUG, + receive_level: Level::DEBUG, + call_level: Level::DEBUG, + streaming_level: Level::DEBUG, + close_level: Level::INFO, + connect_timing: false, + send_timing: false, + receive_timing: false, + call_timing: false, + streaming_timing: false, + close_timing: false, + } + } +} + +impl TracingConfig { + /// Set the tracing level for the `connect` operation. + /// + /// # Examples + /// + /// ``` + /// use tracing::Level; + /// use wireframe::client::TracingConfig; + /// + /// let config = TracingConfig::default().with_connect_level(Level::TRACE); + /// let _ = config; + /// ``` + #[must_use] + pub fn with_connect_level(mut self, level: Level) -> Self { + self.connect_level = level; + self + } + + /// Enable or disable per-command timing for the `connect` operation. + /// + /// When enabled, an event recording `elapsed_us` is emitted at `DEBUG` + /// level when the connect operation completes. + /// + /// # Examples + /// + /// ``` + /// use wireframe::client::TracingConfig; + /// + /// let config = TracingConfig::default().with_connect_timing(true); + /// let _ = config; + /// ``` + #[must_use] + pub fn with_connect_timing(mut self, enabled: bool) -> Self { + self.connect_timing = enabled; + self + } + + /// Set the tracing level for `send` and `send_envelope` operations. + #[must_use] + pub fn with_send_level(mut self, level: Level) -> Self { + self.send_level = level; + self + } + + /// Enable or disable per-command timing for `send` and `send_envelope`. + #[must_use] + pub fn with_send_timing(mut self, enabled: bool) -> Self { + self.send_timing = enabled; + self + } + + /// Set the tracing level for `receive` and `receive_envelope` operations. + #[must_use] + pub fn with_receive_level(mut self, level: Level) -> Self { + self.receive_level = level; + self + } + + /// Enable or disable per-command timing for `receive` and + /// `receive_envelope`. + #[must_use] + pub fn with_receive_timing(mut self, enabled: bool) -> Self { + self.receive_timing = enabled; + self + } + + /// Set the tracing level for `call` and `call_correlated` operations. + #[must_use] + pub fn with_call_level(mut self, level: Level) -> Self { + self.call_level = level; + self + } + + /// Enable or disable per-command timing for `call` and `call_correlated`. + #[must_use] + pub fn with_call_timing(mut self, enabled: bool) -> Self { + self.call_timing = enabled; + self + } + + /// Set the tracing level for `call_streaming` operations. + #[must_use] + pub fn with_streaming_level(mut self, level: Level) -> Self { + self.streaming_level = level; + self + } + + /// Enable or disable per-command timing for `call_streaming`. + #[must_use] + pub fn with_streaming_timing(mut self, enabled: bool) -> Self { + self.streaming_timing = enabled; + self + } + + /// Set the tracing level for the `close` operation. + #[must_use] + pub fn with_close_level(mut self, level: Level) -> Self { + self.close_level = level; + self + } + + /// Enable or disable per-command timing for the `close` operation. + #[must_use] + pub fn with_close_timing(mut self, enabled: bool) -> Self { + self.close_timing = enabled; + self + } + + /// Set the tracing level for all operations at once. + /// + /// # Examples + /// + /// ``` + /// use tracing::Level; + /// use wireframe::client::TracingConfig; + /// + /// let config = TracingConfig::default().with_all_levels(Level::TRACE); + /// let _ = config; + /// ``` + #[must_use] + pub fn with_all_levels(mut self, level: Level) -> Self { + self.connect_level = level; + self.send_level = level; + self.receive_level = level; + self.call_level = level; + self.streaming_level = level; + self.close_level = level; + self + } + + /// Enable or disable per-command timing for all operations at once. + /// + /// # Examples + /// + /// ``` + /// use wireframe::client::TracingConfig; + /// + /// let config = TracingConfig::default().with_all_timing(true); + /// let _ = config; + /// ``` + #[must_use] + pub fn with_all_timing(mut self, enabled: bool) -> Self { + self.connect_timing = enabled; + self.send_timing = enabled; + self.receive_timing = enabled; + self.call_timing = enabled; + self.streaming_timing = enabled; + self.close_timing = enabled; + self + } +} + +#[cfg(test)] +mod tests { + //! Verifies `TracingConfig` default values, bulk setters, and per-field + //! setter isolation. Helpers `assert_levels` and `assert_timing_flags` + //! compare all six operation slots in a single call. + + use rstest::rstest; + use tracing::Level; + + use super::TracingConfig; + + /// Assert all level fields match the expected pattern. + /// + /// Order: connect, send, receive, call, streaming, close. + fn assert_levels( + cfg: &TracingConfig, + [connect, send, receive, call, streaming, close]: [Level; 6], + ) { + assert_eq!(cfg.connect_level, connect, "connect_level"); + assert_eq!(cfg.send_level, send, "send_level"); + assert_eq!(cfg.receive_level, receive, "receive_level"); + assert_eq!(cfg.call_level, call, "call_level"); + assert_eq!(cfg.streaming_level, streaming, "streaming_level"); + assert_eq!(cfg.close_level, close, "close_level"); + } + + /// Assert all timing flags match the expected pattern. + /// + /// Order: connect, send, receive, call, streaming, close. + fn assert_timing_flags( + cfg: &TracingConfig, + [connect, send, receive, call, streaming, close]: [bool; 6], + ) { + assert_eq!(cfg.connect_timing, connect, "connect_timing"); + assert_eq!(cfg.send_timing, send, "send_timing"); + assert_eq!(cfg.receive_timing, receive, "receive_timing"); + assert_eq!(cfg.call_timing, call, "call_timing"); + assert_eq!(cfg.streaming_timing, streaming, "streaming_timing"); + assert_eq!(cfg.close_timing, close, "close_timing"); + } + + #[test] + fn default_levels_are_info_for_lifecycle_debug_for_data_ops() { + let cfg = TracingConfig::default(); + assert_levels( + &cfg, + [ + Level::INFO, + Level::DEBUG, + Level::DEBUG, + Level::DEBUG, + Level::DEBUG, + Level::INFO, + ], + ); + } + + #[test] + fn default_timing_flags_are_all_false() { + let cfg = TracingConfig::default(); + assert_timing_flags(&cfg, [false; 6]); + } + + #[test] + fn with_all_levels_updates_every_level_field() { + let cfg = TracingConfig::default().with_all_levels(Level::TRACE); + assert_levels(&cfg, [Level::TRACE; 6]); + } + + #[test] + fn with_all_timing_true_enables_every_flag() { + let cfg = TracingConfig::default().with_all_timing(true); + assert_timing_flags(&cfg, [true; 6]); + } + + #[test] + fn with_all_timing_false_disables_every_flag() { + let cfg = TracingConfig::default() + .with_all_timing(true) + .with_all_timing(false); + assert_timing_flags(&cfg, [false; 6]); + } + + #[rstest] + #[case::connect( + TracingConfig::default().with_connect_timing(true), + [true, false, false, false, false, false], + )] + #[case::send( + TracingConfig::default().with_send_timing(true), + [false, true, false, false, false, false], + )] + #[case::receive( + TracingConfig::default().with_receive_timing(true), + [false, false, true, false, false, false], + )] + #[case::call( + TracingConfig::default().with_call_timing(true), + [false, false, false, true, false, false], + )] + #[case::streaming( + TracingConfig::default().with_streaming_timing(true), + [false, false, false, false, true, false], + )] + #[case::close( + TracingConfig::default().with_close_timing(true), + [false, false, false, false, false, true], + )] + fn individual_timing_setters_only_affect_their_flag( + #[case] cfg: TracingConfig, + #[case] expected: [bool; 6], + ) { + assert_timing_flags(&cfg, expected); + } + + #[rstest] + #[case::connect( + TracingConfig::default().with_connect_level(Level::TRACE), + [Level::TRACE, Level::DEBUG, Level::DEBUG, Level::DEBUG, Level::DEBUG, Level::INFO], + )] + #[case::send( + TracingConfig::default().with_send_level(Level::TRACE), + [Level::INFO, Level::TRACE, Level::DEBUG, Level::DEBUG, Level::DEBUG, Level::INFO], + )] + #[case::receive( + TracingConfig::default().with_receive_level(Level::TRACE), + [Level::INFO, Level::DEBUG, Level::TRACE, Level::DEBUG, Level::DEBUG, Level::INFO], + )] + #[case::call( + TracingConfig::default().with_call_level(Level::TRACE), + [Level::INFO, Level::DEBUG, Level::DEBUG, Level::TRACE, Level::DEBUG, Level::INFO], + )] + #[case::streaming( + TracingConfig::default().with_streaming_level(Level::TRACE), + [Level::INFO, Level::DEBUG, Level::DEBUG, Level::DEBUG, Level::TRACE, Level::INFO], + )] + #[case::close( + TracingConfig::default().with_close_level(Level::TRACE), + [Level::INFO, Level::DEBUG, Level::DEBUG, Level::DEBUG, Level::DEBUG, Level::TRACE], + )] + fn individual_level_setters_only_affect_their_field( + #[case] cfg: TracingConfig, + #[case] expected: [Level; 6], + ) { + assert_levels(&cfg, expected); + } +} diff --git a/src/client/tracing_helpers.rs b/src/client/tracing_helpers.rs new file mode 100644 index 00000000..577dfb16 --- /dev/null +++ b/src/client/tracing_helpers.rs @@ -0,0 +1,154 @@ +//! Tracing span and event helpers for wireframe client operations. +//! +//! These helpers centralise span creation with dynamic level selection and +//! per-command timing emission, keeping the instrumentation logic out of the +//! hot-path client methods. + +use std::time::Instant; + +use tracing::{Level, Span}; + +use super::tracing_config::TracingConfig; + +/// Create a tracing span at a dynamically selected level. +/// +/// The level is matched against the five `tracing::Level` variants. Each +/// branch calls the corresponding `tracing::_span!` macro, which +/// ensures the span metadata is statically known per branch while the +/// branch selection is dynamic. +macro_rules! dynamic_span { + ($level:expr, $name:expr $(, $($field:tt)*)?) => { + match $level { + Level::ERROR => tracing::error_span!($name $(, $($field)*)?), + Level::WARN => tracing::warn_span!($name $(, $($field)*)?), + Level::INFO => tracing::info_span!($name $(, $($field)*)?), + Level::DEBUG => tracing::debug_span!($name $(, $($field)*)?), + Level::TRACE => tracing::trace_span!($name $(, $($field)*)?), + } + }; +} + +/// Create a span for the `connect` operation. +#[expect( + clippy::cognitive_complexity, + reason = "complexity from dynamic_span! macro expansion — five match arms are inherent" +)] +pub(crate) fn connect_span(config: &TracingConfig, peer_addr: &str) -> Span { + dynamic_span!( + config.connect_level, + "client.connect", + peer.addr = peer_addr + ) +} + +/// Create a span for the `send` operation. +#[expect( + clippy::cognitive_complexity, + reason = "complexity from dynamic_span! macro expansion" +)] +pub(crate) fn send_span(config: &TracingConfig, frame_bytes: usize) -> Span { + dynamic_span!(config.send_level, "client.send", frame.bytes = frame_bytes) +} + +/// Create a span for the `receive` operation. +/// +/// The `frame.bytes` and `result` fields are recorded after the frame arrives +/// using [`Span::record`]. +#[expect( + clippy::cognitive_complexity, + reason = "complexity from dynamic_span! macro expansion" +)] +pub(crate) fn receive_span(config: &TracingConfig) -> Span { + dynamic_span!( + config.receive_level, + "client.receive", + frame.bytes = tracing::field::Empty, + result = tracing::field::Empty + ) +} + +/// Create a span for the `send_envelope` operation. +#[expect( + clippy::cognitive_complexity, + reason = "complexity from dynamic_span! macro expansion" +)] +pub(crate) fn send_envelope_span( + config: &TracingConfig, + correlation_id: u64, + frame_bytes: usize, +) -> Span { + dynamic_span!( + config.send_level, + "client.send_envelope", + correlation_id = correlation_id, + frame.bytes = frame_bytes + ) +} + +/// Create a span for the `call` operation. +/// +/// The `result` field is recorded when the call completes using +/// [`Span::record`]. +#[expect( + clippy::cognitive_complexity, + reason = "complexity from dynamic_span! macro expansion" +)] +pub(crate) fn call_span(config: &TracingConfig) -> Span { + dynamic_span!( + config.call_level, + "client.call", + result = tracing::field::Empty + ) +} + +/// Create a span for the `call_correlated` operation. +/// +/// The `correlation_id` and `result` fields are recorded during the call +/// using [`Span::record`]. +#[expect( + clippy::cognitive_complexity, + reason = "complexity from dynamic_span! macro expansion" +)] +pub(crate) fn call_correlated_span(config: &TracingConfig) -> Span { + dynamic_span!( + config.call_level, + "client.call_correlated", + correlation_id = tracing::field::Empty, + result = tracing::field::Empty + ) +} + +/// Create a span for the `call_streaming` operation. +#[expect( + clippy::cognitive_complexity, + reason = "complexity from dynamic_span! macro expansion" +)] +pub(crate) fn streaming_span(config: &TracingConfig, correlation_id: u64) -> Span { + dynamic_span!( + config.streaming_level, + "client.call_streaming", + correlation_id = correlation_id, + frame.bytes = tracing::field::Empty + ) +} + +/// Create a span for the `close` operation. +#[expect( + clippy::cognitive_complexity, + reason = "complexity from dynamic_span! macro expansion" +)] +pub(crate) fn close_span(config: &TracingConfig) -> Span { + dynamic_span!(config.close_level, "client.close") +} + +/// Record elapsed time if timing was enabled for this operation. +/// +/// The `start` parameter is `None` when timing is disabled and +/// `Some(instant)` when enabled. When `Some`, an event is emitted with +/// the `elapsed_us` field at `DEBUG` level. +pub(crate) fn emit_timing_event(start: Option) { + if let Some(start) = start { + let elapsed_us = start.elapsed().as_micros(); + tracing::debug!(elapsed_us = elapsed_us, "operation.timing"); + } +} diff --git a/tests/features/client_tracing.feature b/tests/features/client_tracing.feature new file mode 100644 index 00000000..ce3dc221 --- /dev/null +++ b/tests/features/client_tracing.feature @@ -0,0 +1,37 @@ +Feature: Client structured logging and tracing spans + The wireframe client emits tracing spans and structured events around + connect, send, receive, call, streaming, and close operations. + Per-command timing can be enabled via TracingConfig. + + Scenario: Connect emits a tracing span with the peer address + Given a running echo server for tracing tests + And a client with connect timing enabled + When the client connects to the server + Then the tracing output contains "client.connect" + And the tracing output contains the peer address + + Scenario: Send emits a tracing span with frame size + Given a connected tracing client with send timing enabled + When the client sends an envelope via the tracing client + Then the tracing output contains "client.send" + And the tracing output contains "frame.bytes" + + Scenario: Receive emits a tracing span recording result + Given a connected tracing client with receive timing enabled + When the client sends and receives via the tracing client + Then the tracing output contains "client.receive" + + Scenario: Per-command timing emits elapsed microseconds + Given a connected tracing client with send timing enabled + When the client sends an envelope via the tracing client + Then the tracing output contains "elapsed_us" + + Scenario: Timing is not emitted when disabled + Given a connected tracing client with default config + When the client sends an envelope via the tracing client + Then the tracing output does not contain "elapsed_us" + + Scenario: Close emits a tracing span + Given a connected tracing client with close timing enabled + When the tracing client closes the connection + Then the tracing output contains "client.close" diff --git a/tests/fixtures/client_tracing.rs b/tests/fixtures/client_tracing.rs new file mode 100644 index 00000000..3212403a --- /dev/null +++ b/tests/fixtures/client_tracing.rs @@ -0,0 +1,256 @@ +//! `ClientTracingWorld` fixture for rstest-bdd tracing tests. +//! +//! Provides a thread-local tracing subscriber that captures formatted output +//! into a shared buffer for assertion in step definitions. + +#![expect( + clippy::excessive_nesting, + reason = "async closures within spawned echo server are inherently nested" +)] + +use std::{ + net::SocketAddr, + sync::{Arc, Mutex}, +}; + +use bytes::Bytes; +use futures::{SinkExt, StreamExt}; +use rstest::fixture; +use tokio::{net::TcpListener, task::JoinHandle}; +use tokio_util::codec::{Framed, LengthDelimitedCodec}; +use wireframe::{ + app::Envelope, + client::{TracingConfig, WireframeClient}, + rewind_stream::RewindStream, + serializer::BincodeSerializer, +}; +/// Re-export `TestResult` from `wireframe_testing` for use in steps. +pub use wireframe_testing::TestResult; +use wireframe_testing::{ServerMode, process_frame}; + +/// Client type alias for tracing tests. +type TestClient = WireframeClient>; + +/// Writer that appends formatted tracing output to a shared buffer. +#[derive(Clone, Debug)] +struct CaptureWriter { + buf: Arc>>, +} + +impl CaptureWriter { + fn new(buf: Arc>>) -> Self { Self { buf } } +} + +impl std::io::Write for CaptureWriter { + fn write(&mut self, data: &[u8]) -> std::io::Result { + self.buf + .lock() + .map_err(|e| std::io::Error::other(e.to_string()))? + .extend_from_slice(data); + Ok(data.len()) + } + + fn flush(&mut self) -> std::io::Result<()> { Ok(()) } +} + +impl<'a> tracing_subscriber::fmt::MakeWriter<'a> for CaptureWriter { + type Writer = Self; + + fn make_writer(&'a self) -> Self::Writer { self.clone() } +} + +/// Test world for client tracing BDD scenarios. +pub struct ClientTracingWorld { + runtime: Option, + runtime_error: Option, + addr: Option, + server: Option>, + client: Option, + tracing_config: TracingConfig, + captured: Arc>>, + _subscriber_guard: tracing::subscriber::DefaultGuard, +} + +impl Drop for ClientTracingWorld { + fn drop(&mut self) { + if let (Some(runtime), Some(handle)) = (self.runtime.as_ref(), self.server.take()) { + runtime.block_on(async move { handle.abort() }); + } + } +} + +/// Create a new `ClientTracingWorld` with default tracing config. +fn new_world(config: TracingConfig) -> ClientTracingWorld { + let (runtime, runtime_error) = match tokio::runtime::Runtime::new() { + Ok(rt) => (Some(rt), None), + Err(err) => (None, Some(format!("failed to create runtime: {err}"))), + }; + let captured = Arc::new(Mutex::new(Vec::new())); + let writer = CaptureWriter::new(Arc::clone(&captured)); + + let subscriber = tracing_subscriber::fmt() + .with_writer(writer) + .with_level(true) + .with_ansi(false) + .with_max_level(tracing::Level::TRACE) + .finish(); + let guard = tracing::subscriber::set_default(subscriber); + + ClientTracingWorld { + runtime, + runtime_error, + addr: None, + server: None, + client: None, + tracing_config: config, + captured, + _subscriber_guard: guard, + } +} + +/// Fixture for `ClientTracingWorld` with default tracing config. +#[rustfmt::skip] +#[fixture] +pub fn client_tracing_world() -> ClientTracingWorld { + new_world(TracingConfig::default()) +} + +impl ClientTracingWorld { + /// Start a standard echo server. + /// + /// # Errors + /// Returns an error if binding fails. + pub async fn start_echo_server(&mut self) -> TestResult { + let listener = TcpListener::bind("127.0.0.1:0").await?; + let addr = listener.local_addr()?; + + let handle = tokio::spawn(async move { + let Ok((stream, _)) = listener.accept().await else { + return; + }; + let mut framed = Framed::new(stream, LengthDelimitedCodec::new()); + + while let Some(Ok(bytes)) = framed.next().await { + let Some(response_bytes) = process_frame(ServerMode::Echo, &bytes) else { + break; + }; + if framed.send(Bytes::from(response_bytes)).await.is_err() { + break; + } + } + }); + + self.addr = Some(addr); + self.server = Some(handle); + Ok(()) + } + + /// Update the tracing config. + pub fn set_tracing_config(&mut self, config: TracingConfig) { self.tracing_config = config; } + + /// Return a cloned handle to the owned Tokio runtime. + /// + /// Callers use `let rt = world.handle()?; rt.block_on(world.some_async_method())` + /// to run async methods: cloning the handle first avoids conflicting + /// borrows between the runtime and the `&mut self` taken by async methods. + /// + /// # Errors + /// Returns an error if the runtime failed to initialize. + pub fn handle(&self) -> TestResult { + self.runtime + .as_ref() + .map(|rt| rt.handle().clone()) + .ok_or_else(|| { + self.runtime_error + .clone() + .unwrap_or_else(|| "runtime unavailable".to_string()) + .into() + }) + } + + /// Connect to the server with the current tracing config. + /// + /// # Errors + /// Returns an error if connection fails. + pub async fn connect(&mut self) -> TestResult { + let addr = self.addr.ok_or("server address missing")?; + let client = WireframeClient::builder() + .tracing_config(self.tracing_config.clone()) + .connect(addr) + .await?; + self.client = Some(client); + Ok(()) + } + + /// Send an envelope via the connected client. + /// + /// # Errors + /// Returns an error if send fails. + pub async fn send_envelope(&mut self) -> TestResult { + let client = self.client.as_mut().ok_or("client not connected")?; + let envelope = Envelope::new(1, None, vec![1, 2, 3]); + client.send(&envelope).await?; + Ok(()) + } + + /// Send an envelope and receive the echoed response. + /// + /// # Errors + /// Returns an error if send or receive fails. + pub async fn send_and_receive(&mut self) -> TestResult { + let client = self.client.as_mut().ok_or("client not connected")?; + let envelope = Envelope::new(1, None, vec![1, 2, 3]); + let _response: Envelope = client.call(&envelope).await?; + Ok(()) + } + + /// Close the client connection. + pub async fn close_connection(&mut self) { + if let Some(client) = self.client.take() { + client.close().await; + } + } + + /// Return the peer address string. + pub fn peer_addr_string(&self) -> String { + self.addr.map(|a| a.to_string()).unwrap_or_default() + } + + /// Check whether captured tracing output contains a needle. + pub fn output_contains(&self, needle: &str) -> bool { + let Ok(buf) = self.captured.lock() else { + return false; + }; + let output = String::from_utf8_lossy(&buf); + output.contains(needle) + } + + /// Assert that the output contains the needle. + /// + /// # Errors + /// Returns an error if the needle is not found. + pub fn assert_output_contains(&self, needle: &str) -> TestResult { + if self.output_contains(needle) { + Ok(()) + } else { + let buf = self + .captured + .lock() + .map_err(|e| std::io::Error::other(format!("lock captured buffer: {e}")))?; + let output = String::from_utf8_lossy(&buf); + Err(format!("expected output to contain {needle:?}, got:\n{output}").into()) + } + } + + /// Assert that the output does NOT contain the needle. + /// + /// # Errors + /// Returns an error if the needle is found. + pub fn assert_output_not_contains(&self, needle: &str) -> TestResult { + if self.output_contains(needle) { + Err(format!("expected output NOT to contain {needle:?}, but it was present").into()) + } else { + Ok(()) + } + } +} diff --git a/tests/fixtures/mod.rs b/tests/fixtures/mod.rs index 994c3333..68469fcc 100644 --- a/tests/fixtures/mod.rs +++ b/tests/fixtures/mod.rs @@ -11,6 +11,7 @@ pub mod client_request_hooks; pub mod client_runtime; pub mod client_send_streaming; pub mod client_streaming; +pub mod client_tracing; pub mod codec_error; pub mod codec_performance_benchmarks; pub mod codec_property_roundtrip; diff --git a/tests/scenarios/client_tracing_scenarios.rs b/tests/scenarios/client_tracing_scenarios.rs new file mode 100644 index 00000000..c27082da --- /dev/null +++ b/tests/scenarios/client_tracing_scenarios.rs @@ -0,0 +1,51 @@ +//! Scenario tests for client tracing span behaviours. + +use rstest_bdd_macros::scenario; + +use crate::fixtures::client_tracing::*; + +#[scenario( + path = "tests/features/client_tracing.feature", + name = "Connect emits a tracing span with the peer address" +)] +fn connect_span_with_peer_address(client_tracing_world: ClientTracingWorld) { + let _ = client_tracing_world; +} + +#[scenario( + path = "tests/features/client_tracing.feature", + name = "Send emits a tracing span with frame size" +)] +fn send_span_with_frame_size(client_tracing_world: ClientTracingWorld) { + let _ = client_tracing_world; +} + +#[scenario( + path = "tests/features/client_tracing.feature", + name = "Receive emits a tracing span recording result" +)] +fn receive_span_recording_result(client_tracing_world: ClientTracingWorld) { + let _ = client_tracing_world; +} + +#[scenario( + path = "tests/features/client_tracing.feature", + name = "Per-command timing emits elapsed microseconds" +)] +fn timing_emits_elapsed_us(client_tracing_world: ClientTracingWorld) { + let _ = client_tracing_world; +} + +#[scenario( + path = "tests/features/client_tracing.feature", + name = "Timing is not emitted when disabled" +)] +fn timing_not_emitted_when_disabled(client_tracing_world: ClientTracingWorld) { + let _ = client_tracing_world; +} + +#[scenario( + path = "tests/features/client_tracing.feature", + name = "Close emits a tracing span" +)] +fn close_emits_span(client_tracing_world: ClientTracingWorld) { let _ = client_tracing_world; } diff --git a/tests/scenarios/mod.rs b/tests/scenarios/mod.rs index 9358919d..1cc6c91a 100644 --- a/tests/scenarios/mod.rs +++ b/tests/scenarios/mod.rs @@ -15,6 +15,7 @@ mod client_request_hooks_scenarios; mod client_runtime_scenarios; mod client_send_streaming_scenarios; mod client_streaming_scenarios; +mod client_tracing_scenarios; mod codec_error_scenarios; mod codec_performance_benchmarks_scenarios; mod codec_property_roundtrip_scenarios; diff --git a/tests/steps/client_tracing_steps.rs b/tests/steps/client_tracing_steps.rs new file mode 100644 index 00000000..71219552 --- /dev/null +++ b/tests/steps/client_tracing_steps.rs @@ -0,0 +1,128 @@ +//! Step definitions for wireframe client tracing behavioural tests. + +use rstest_bdd_macros::{given, then, when}; +use wireframe::client::TracingConfig; + +use crate::fixtures::client_tracing::{ClientTracingWorld, TestResult}; + +// --------------------------------------------------------------------------- +// Given steps +// --------------------------------------------------------------------------- + +#[given("a running echo server for tracing tests")] +fn given_echo_server(client_tracing_world: &mut ClientTracingWorld) -> TestResult { + let rt = client_tracing_world.handle()?; + rt.block_on(client_tracing_world.start_echo_server()) +} + +#[given("a client with connect timing enabled")] +fn given_client_with_connect_timing(client_tracing_world: &mut ClientTracingWorld) { + client_tracing_world.set_tracing_config(TracingConfig::default().with_connect_timing(true)); +} + +#[given("a connected tracing client with send timing enabled")] +fn given_connected_send_timing(client_tracing_world: &mut ClientTracingWorld) -> TestResult { + let rt = client_tracing_world.handle()?; + rt.block_on(client_tracing_world.start_echo_server())?; + client_tracing_world.set_tracing_config(TracingConfig::default().with_send_timing(true)); + rt.block_on(client_tracing_world.connect()) +} + +#[given("a connected tracing client with receive timing enabled")] +fn given_connected_receive_timing(client_tracing_world: &mut ClientTracingWorld) -> TestResult { + let rt = client_tracing_world.handle()?; + rt.block_on(client_tracing_world.start_echo_server())?; + client_tracing_world.set_tracing_config(TracingConfig::default().with_receive_timing(true)); + rt.block_on(client_tracing_world.connect()) +} + +#[given("a connected tracing client with default config")] +fn given_connected_default_config(client_tracing_world: &mut ClientTracingWorld) -> TestResult { + let rt = client_tracing_world.handle()?; + rt.block_on(client_tracing_world.start_echo_server())?; + rt.block_on(client_tracing_world.connect()) +} + +#[given("a connected tracing client with close timing enabled")] +fn given_connected_close_timing(client_tracing_world: &mut ClientTracingWorld) -> TestResult { + let rt = client_tracing_world.handle()?; + rt.block_on(client_tracing_world.start_echo_server())?; + client_tracing_world.set_tracing_config(TracingConfig::default().with_close_timing(true)); + rt.block_on(client_tracing_world.connect()) +} + +// --------------------------------------------------------------------------- +// When steps +// --------------------------------------------------------------------------- + +#[when("the client connects to the server")] +fn when_client_connects(client_tracing_world: &mut ClientTracingWorld) -> TestResult { + let rt = client_tracing_world.handle()?; + rt.block_on(client_tracing_world.connect()) +} + +#[when("the client sends an envelope via the tracing client")] +fn when_client_sends(client_tracing_world: &mut ClientTracingWorld) -> TestResult { + let rt = client_tracing_world.handle()?; + rt.block_on(client_tracing_world.send_envelope()) +} + +#[when("the client sends and receives via the tracing client")] +fn when_client_sends_and_receives(client_tracing_world: &mut ClientTracingWorld) -> TestResult { + let rt = client_tracing_world.handle()?; + rt.block_on(client_tracing_world.send_and_receive()) +} + +#[when("the tracing client closes the connection")] +fn when_client_closes(client_tracing_world: &mut ClientTracingWorld) -> TestResult { + let rt = client_tracing_world.handle()?; + rt.block_on(client_tracing_world.close_connection()); + Ok(()) +} + +// --------------------------------------------------------------------------- +// Then steps +// --------------------------------------------------------------------------- + +#[then("the tracing output contains \"client.connect\"")] +fn then_output_contains_connect(client_tracing_world: &mut ClientTracingWorld) -> TestResult { + client_tracing_world.assert_output_contains("client.connect") +} + +#[then("the tracing output contains the peer address")] +fn then_output_contains_peer_address(client_tracing_world: &mut ClientTracingWorld) -> TestResult { + let addr = client_tracing_world.peer_addr_string(); + client_tracing_world.assert_output_contains(&addr) +} + +#[then("the tracing output contains \"client.send\"")] +fn then_output_contains_send(client_tracing_world: &mut ClientTracingWorld) -> TestResult { + client_tracing_world.assert_output_contains("client.send") +} + +#[then("the tracing output contains \"frame.bytes\"")] +fn then_output_contains_frame_bytes(client_tracing_world: &mut ClientTracingWorld) -> TestResult { + client_tracing_world.assert_output_contains("frame.bytes") +} + +#[then("the tracing output contains \"client.receive\"")] +fn then_output_contains_receive(client_tracing_world: &mut ClientTracingWorld) -> TestResult { + client_tracing_world.assert_output_contains("client.receive") +} + +#[then("the tracing output contains \"elapsed_us\"")] +fn then_output_contains_elapsed_us(client_tracing_world: &mut ClientTracingWorld) -> TestResult { + client_tracing_world.assert_output_contains("elapsed_us") +} + +#[then("the tracing output does not contain \"elapsed_us\"")] +fn then_output_not_contains_elapsed_us( + client_tracing_world: &mut ClientTracingWorld, +) -> TestResult { + client_tracing_world.assert_output_not_contains("elapsed_us") +} + +#[then("the tracing output contains \"client.close\"")] +fn then_output_contains_close(client_tracing_world: &mut ClientTracingWorld) -> TestResult { + client_tracing_world.assert_output_contains("client.close") +} diff --git a/tests/steps/mod.rs b/tests/steps/mod.rs index 9623deab..7702cef7 100644 --- a/tests/steps/mod.rs +++ b/tests/steps/mod.rs @@ -11,6 +11,7 @@ mod client_request_hooks_steps; mod client_runtime_steps; mod client_send_streaming_steps; mod client_streaming_steps; +mod client_tracing_steps; mod codec_error_steps; mod codec_performance_benchmarks_steps; mod codec_property_roundtrip_steps;