diff --git a/Cargo.toml b/Cargo.toml index 525286b3..7f98d22b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -20,7 +20,7 @@ features = ["metrics"] [dependencies] serde = { version = "1.0.219", features = ["derive"] } -bincode = "2.0.1" +bincode = { version = "2.0.1", features = ["serde"] } tokio = { version = "1.47.1", default-features = false, features = [ "net", "signal", @@ -82,6 +82,7 @@ metrics-exporter-prometheus = { version = "0.17.2", optional = true, features = default = ["metrics", "serializer-bincode"] metrics = ["dep:metrics", "dep:metrics-exporter-prometheus"] serializer-bincode = [] +serializer-serde = [] advanced-tests = [] examples = [] test-support = [] diff --git a/docs/adr-005-serializer-abstraction.md b/docs/adr-005-serializer-abstraction.md index 7aa127ec..4d3a57b5 100644 --- a/docs/adr-005-serializer-abstraction.md +++ b/docs/adr-005-serializer-abstraction.md @@ -2,11 +2,11 @@ ## Status -Proposed. +Accepted. ## Date -2025-12-30. +2025-12-30 (accepted update: 2026-02-22). ## Context and Problem Statement @@ -82,6 +82,23 @@ optional wire-rs or Serde adaptors to reduce boilerplate. This allows frame metadata to participate in deserialization, supports version negotiation, and avoids committing Wireframe to a single serializer. +Accepted implementation details: + +- `message::EncodeWith` and `message::DecodeWith` define serializer-aware + adaptor boundaries used by `Serializer`. +- `message::DeserializeContext` carries parsed frame metadata into + `Serializer::deserialize_with_context`. +- `Serializer` now supports `deserialize_with_context` with a default fallback + to `deserialize`, preserving existing serializers. +- Existing bincode-centric flows remain source-compatible through the + `message::Message` compatibility layer. +- Legacy `Message` compatibility is now an explicit serializer opt-in through + `serializer::MessageCompatibilitySerializer`, avoiding blanket impl lockout + for serializer-specific adaptors. +- An optional Serde bridge ships behind feature `serializer-serde` via + `message::serde_bridge::{SerdeMessage, IntoSerdeMessage}` plus + `serializer::SerdeSerializerBridge`. + ## Goals and Non-Goals ### Goals @@ -123,9 +140,11 @@ avoids committing Wireframe to a single serializer. ## Outstanding Decisions -- Define the precise `MessageBody` surface (borrowed vs owned payloads). -- Decide how codec metadata is passed into the deserializer context. -- Choose which optional adaptors ship by default and which are feature-gated. +- Decide whether envelope encoding should remain compatibility-first bincode + for `Message` implementers across all serializers, or move to stricter + serializer-specific envelope adaptors in a future major release. +- Evaluate whether a wire-rs bridge should be added alongside the Serde bridge + in a follow-up roadmap item. ## Architectural Rationale diff --git a/docs/execplans/9-5-1-decouple-message-encoding-from-bincode.md b/docs/execplans/9-5-1-decouple-message-encoding-from-bincode.md new file mode 100644 index 00000000..7e92e7e4 --- /dev/null +++ b/docs/execplans/9-5-1-decouple-message-encoding-from-bincode.md @@ -0,0 +1,503 @@ +# 9.5.1 Decouple message encoding from bincode-specific traits + +This Execution Plan (ExecPlan) is a living document. The sections +`Constraints`, `Tolerances`, `Risks`, `Progress`, `Surprises & Discoveries`, +`Decision Log`, and `Outcomes & Retrospective` must be kept up to date as work +proceeds. + +Status: COMPLETED (2026-02-22) + +Maintenance cadence: update `Progress` immediately after each stage completes, +record architecture and scope choices in `Decision Log` when they are made, and +write `Outcomes & Retrospective` at Stage E completion. + +No `PLANS.md` exists in this repository as of 2026-02-21. + +## Purpose / big picture + +Roadmap item `9.5.1` hardens the pluggable codec effort by removing +`bincode`-specific trait coupling from message encoding boundaries. The +implementation must preserve existing bincode flows, add a serialiser-agnostic +message adapter surface, introduce an optional bridge that reduces manual +boilerplate, and define how frame metadata reaches deserialisation for version +negotiation. + +After this work, maintainers and library consumers can observe: + +- Client and server messaging APIs encode and decode message types through a + serialiser-agnostic adapter contract instead of direct `bincode` trait bounds. +- Existing bincode message types continue to work with minimal or zero source + changes through compatibility shims. +- A feature-gated Serde (serialisation/deserialisation) bridge is available as + an optional path to reduce manual adapter implementations. +- Deserializers can inspect frame metadata through an explicit context object, + enabling protocol version negotiation patterns. +- Unit tests use `rstest`, behavioural tests use `rstest-bdd` v0.5.0, design + decisions are recorded in `docs/adr-005-serializer-abstraction.md`, public + interface changes are documented in `docs/users-guide.md`, and roadmap item + `9.5.1` is marked done only after all quality gates pass. + +Authority boundaries for documentation: + +- This ExecPlan is the implementation staging guide and task tracker. +- `docs/adr-005-serializer-abstraction.md` is the source of truth for + serializer-boundary architecture and trade-offs. +- `docs/users-guide.md` is the source of truth for public API contracts and + migration guidance for library consumers. + +## Constraints + +- Preserve runtime behaviour for existing bincode users unless explicitly + documented as changed in migration guidance. +- Keep `WireframeApp` and `WireframeClient` defaults on bincode-compatible + behaviour, so existing applications do not need immediate serialiser rewrites. +- Use `rstest` for unit tests and `rstest-bdd` v0.5.0 for behavioural tests. +- Do not introduce `wire-rs` as a mandatory dependency. This item will satisfy + the roadmap bridge requirement via an optional Serde bridge. +- Keep framing, fragmentation, correlation, and routing semantics unchanged + except for introducing metadata visibility to deserialization context. +- Record design decisions in `docs/adr-005-serializer-abstraction.md`. If + metadata context semantics change materially, update + `docs/message-versioning.md` as well. +- Update `docs/users-guide.md` with public API and migration guidance for + existing bincode users. +- Mark `9.5.1` and its child bullets done in `docs/roadmap.md` only after + implementation, documentation updates, and all validations succeed. +- Follow repository quality gates and run commands with `set -o pipefail` and + `tee` logging. + +## Tolerances (exception triggers) + +- Scope: if implementation requires touching more than 26 files or more than + 1,800 net changed lines, stop and re-scope before proceeding. +- Interface: if preserving source compatibility for bincode users requires + impossible trait coherence trade-offs, stop and present options before making + a breaking change. +- Dependencies: if the chosen optional bridge needs a new external crate beyond + existing workspace dependencies, stop and escalate. +- Iterations: if the same failing root cause persists after 3 fix attempts, + stop and document alternatives in `Decision Log`. +- Time: if a single stage takes more than one focused day without meeting its + validation target, stop and re-plan. +- Ambiguity: if metadata context shape has multiple viable designs with + materially different API ergonomics, stop and present options with trade-offs. + +## Risks + +- Risk: adapter trait design can create confusing generic bounds and regress + ergonomics. Severity: high. Likelihood: medium. Mitigation: keep trait + surface minimal, provide helper aliases, and document migration with + before/after snippets. + +- Risk: backward compatibility for bincode users can silently break if helper + methods or trait imports move. Severity: high. Likelihood: medium. + Mitigation: preserve compatibility shims and add explicit regression tests + for legacy derive-and-send flows. + +- Risk: metadata context plumbing can add allocations or copy-heavy paths in + hot decode loops. Severity: medium. Likelihood: medium. Mitigation: expose + borrowed metadata slices where possible and verify no extra heap allocation + is introduced in default bincode path. + +- Risk: behavioural tests may assert implementation details rather than user + observable outcomes. Severity: medium. Likelihood: medium. Mitigation: keep + behaviour-driven development (BDD) scenarios focused on round-trip + compatibility, migration-visible behaviour, and version negotiation outcomes. + +- Risk: optional bridge feature matrix can drift from defaults and rot. + Severity: medium. Likelihood: medium. Mitigation: include targeted unit + coverage for feature-gated code and ensure `make test` with `--all-features` + exercises the bridge path. + +## Progress + +- [x] (2026-02-21 00:00Z) Drafted ExecPlan for roadmap item `9.5.1` with + staged implementation and acceptance criteria. +- [x] Stage A complete: baseline analysis, adapter API decision, and context + shape finalized. +- [x] Stage B complete: serializer-agnostic message adapter layer and bincode + compatibility shim implemented. +- [x] Stage C complete: metadata context propagation wired through decode path + and version-negotiation integration tests added. +- [x] Stage D complete: optional Serde bridge implemented and validated. +- [x] Stage E complete: documentation updates, roadmap completion, and full + quality gate validation. + +## Surprises & Discoveries + +- Observation: there is no `PLANS.md`, so this ExecPlan is the sole execution + guide for the item. Evidence: repository root listing on 2026-02-21. Impact: + all execution details must remain self-contained in this file. + +- Observation: `src/message.rs` currently defines `Message` as + `bincode::Encode + bincode::BorrowDecode`, and serializer methods require + that trait. Evidence: `src/message.rs`, `src/serializer.rs`. Impact: the + coupling is direct and affects both client and server API bounds. + +- Observation: `FrameMetadata::parse` already runs before fallback full + deserialization in server connection handling. Evidence: + `src/app/connection.rs::parse_envelope`. Impact: metadata context can be + introduced without changing framing order. + +- Observation: behavioural test harness is already on `rstest-bdd` v0.5.0 with + strict compile-time validation. Evidence: `Cargo.toml` dev-dependencies and + `Makefile` target `test-bdd`. Impact: new BDD coverage can follow existing + `tests/features|fixtures|steps|scenarios` structure without runner migration + work. + +- Observation: `serde` is already a normal dependency. + Evidence: `Cargo.toml`. Impact: the optional bridge requirement can be + satisfied without adding a new crate dependency. + +## Decision Log + +- Decision: implement Architecture Decision Record (ADR-005) Option B + (serializer-agnostic boundary plus adapters) using compatibility-first + layering. Rationale: satisfies roadmap goals while minimizing disruption for + current users. Date/Author: 2026-02-21 / Codex. + +- Decision: satisfy the roadmap bridge requirement with an optional Serde + bridge in this item; defer wire-rs bridge to a later roadmap task unless + explicitly requested. Rationale: Serde is already available in the dependency + graph and avoids introducing new external dependencies. Date/Author: + 2026-02-21 / Codex. + +- Decision: add an explicit deserialization context object that carries frame + metadata rather than relying on hidden globals or implicit thread-local + state. Rationale: explicit context keeps API behaviour testable and supports + version negotiation deterministically. Date/Author: 2026-02-21 / Codex. + +## Outcomes & Retrospective + +- Implemented serializer-agnostic adapter boundaries through + `EncodeWith`/`DecodeWith` and `DeserializeContext`. +- Introduced metadata-context propagation in the inbound parse/deserialize + path. +- Provided optional Serde bridge support behind feature `serializer-serde`. +- Validated serializer boundaries and metadata context handling with behavioural + and unit tests. +- Updated `docs/adr-005-serializer-abstraction.md`, + `docs/users-guide.md`, and `docs/roadmap.md` to reflect shipped behaviour. + +## Context and orientation + +Current coupling points and likely edit targets: + +- `src/message.rs`: + bincode-bound `Message` trait with `to_bytes` and `from_bytes` helpers. +- `src/serializer.rs`: + `Serializer` trait requires `M: Message`; `BincodeSerializer` delegates to + `Message` helpers. +- `src/app/connection.rs`: + metadata parsing and fallback deserialization entry point for inbound frames. +- `src/client/runtime.rs` and `src/client/messaging.rs`: + public client send/receive/call APIs currently bound to `Message`. +- `src/fragment/fragmenter.rs` and `src/fragment/reassembler.rs`: + helper APIs that serialize/deserialize typed messages via `Message`. +- `tests/metadata.rs`: + existing metadata-before-deserialize coverage to extend for context exposure. +- `tests/features/*` plus + `tests/fixtures/*`, `tests/steps/*`, `tests/scenarios/*`: behavioural test + harness based on `rstest-bdd`. + +Reference documentation to keep aligned: + +- `docs/roadmap.md` (`9.5.1` scope and done criteria). +- `docs/adr-005-serializer-abstraction.md` (architecture decision record). +- `docs/message-versioning.md` (metadata and version negotiation behaviour). +- `docs/users-guide.md` (consumer-facing API and migration guidance). +- `docs/generic-message-fragmentation-and-re-assembly-design.md`. +- `docs/multi-packet-and-streaming-responses-design.md`. +- `docs/the-road-to-wireframe-1-0-feature-set-philosophy-and-capability-maturity.md`. +- `docs/hardening-wireframe-a-guide-to-production-resilience.md`. +- `docs/rust-testing-with-rstest-fixtures.md`. +- `docs/reliable-testing-in-rust-via-dependency-injection.md`. +- `docs/rstest-bdd-users-guide.md`. +- `docs/rust-doctest-dry-guide.md`. + +## Plan of work + +### Stage A: baseline and boundary design (no behaviour changes) + +Map every public and internal API currently bounded by `message::Message`, then +define the serializer-agnostic adapter surface and deserialization context +shape. Keep this stage source-compatible and compile-only where possible. + +Planned edits: + +- `src/message.rs` and new `src/message/*` support modules for adapter traits + and context types. +- `src/serializer.rs` trait signature updates with compatibility defaults. +- `docs/adr-005-serializer-abstraction.md` design section update describing the + chosen trait shape before implementation details are finalized. + +Stage A validation: + +- `cargo check --all-targets --all-features` passes. +- No runtime behaviour changes yet. + +Go/no-go: + +- Proceed only after adapter trait signatures and context representation are + settled and documented in `Decision Log`. + +### Stage B: implement serializer-agnostic adapters plus bincode compatibility + +Introduce serializer-aware encode/decode adapter traits and keep bincode users +working via blanket compatibility shims. + +Planned edits: + +- `src/message.rs`: + define adapter traits (encode/decode), context model, and compatibility + helpers for existing bincode message types. +- `src/serializer.rs`: + switch serializer methods to adapter trait bounds and add context-aware + deserialization entry point. +- `src/prelude.rs` and any public re-exports in `src/lib.rs` that must expose + new adapter types. +- Call-site updates in: + `src/client/runtime.rs`, `src/client/messaging.rs`, + `src/client/response_stream.rs`, `src/app/connection.rs`, + `src/fragment/fragmenter.rs`, `src/fragment/reassembler.rs`, and related + tests. + +Stage B validation: + +- Legacy bincode derive types still compile with unchanged call sites in core + examples/tests. +- New unit tests (rstest) prove compatibility and adapter dispatch. + +Go/no-go: + +- Proceed only if bincode compatibility can be preserved without breaking + default APIs. + +### Stage C: expose frame metadata to deserialization context + +Wire parsed metadata into a context object passed to deserializers, enabling +version-aware decode behaviour without changing framing order. + +Planned edits: + +- `src/app/connection.rs`: + build context from `FrameMetadata::parse` output and pass it into + deserializer entry points. +- `src/frame/metadata.rs` or `src/serializer.rs`: + define stable metadata view semantics (what fields are guaranteed and when). +- `tests/metadata.rs`: + extend coverage to assert context receives metadata and fallback behaviour + remains correct. +- Add or extend unit tests around version negotiation plumbing, likely in + `src/app/*` or `tests/*` integration modules. + +Stage C validation: + +- Metadata-aware decode tests pass for parse-success and parse-fallback paths. +- Existing routes and envelope decode behaviour remain unchanged for + non-metadata serializers. + +Go/no-go: + +- Proceed only if context API remains explicit, deterministic, and free of + hidden global state. + +### Stage D: optional bridge and behavioural coverage + +Implement a feature-gated Serde bridge and add BDD scenarios proving observable +compatibility and negotiation behaviour. + +Planned edits: + +- `Cargo.toml` feature table: + add optional bridge feature flag(s) if needed (without new external crates). +- New bridge module(s) under `src/message/` or `src/serializer/` for Serde + helper wrappers/adapters. +- New behavioural suite: + `tests/features/serializer_boundaries.feature`, + `tests/fixtures/serializer_boundaries.rs`, + `tests/steps/serializer_boundaries_steps.rs`, + `tests/scenarios/serializer_boundaries_scenarios.rs`, plus + `tests/fixtures/mod.rs`, `tests/steps/mod.rs`, and `tests/scenarios/mod.rs` + wiring. + +Behavioural scenarios should cover: + +- Existing bincode message workflow still succeeds. +- Optional bridge workflow reduces manual boilerplate and round-trips messages. +- Metadata-driven version selection path is observable via scenario outcomes. + +Stage D validation: + +- `make test-bdd` passes and includes new serializer-boundary scenario names. +- Bridge feature path is exercised under `--all-features`. + +Go/no-go: + +- Proceed only if BDD assertions remain user-observable and not coupled to + private implementation internals. + +### Stage E: docs, migration guidance, roadmap completion, and hardening + +Finalize consumer and design documentation, then run full quality gates. + +Planned edits: + +- `docs/adr-005-serializer-abstraction.md`: + record final API decisions, trade-offs, and status update. +- `docs/users-guide.md`: + add migration guidance for bincode users and document new serializer adapter + surfaces plus metadata context semantics. +- `docs/message-versioning.md`: + update deserialization-context expectations if metadata contract changes. +- `docs/roadmap.md`: + mark `9.5.1` and all child bullets done once validations pass. + +Stage E validation: + +- Documentation matches implemented API. +- Roadmap status matches shipped behaviour. +- All quality gates pass. + +## Concrete steps + +All commands run from repository root: `/home/user/project`. + +1. Baseline before editing: + + ```shell + set -o pipefail && cargo check --all-targets --all-features 2>&1 | tee /tmp/9-5-1-check-baseline.log + set -o pipefail && make test-bdd 2>&1 | tee /tmp/9-5-1-test-bdd-baseline.log + ``` + +2. After Stage B and Stage C edits, run focused verification: + + ```shell + set -o pipefail && cargo test message --all-features 2>&1 | tee /tmp/9-5-1-message-tests.log + set -o pipefail && cargo test metadata --all-features 2>&1 | tee /tmp/9-5-1-metadata-tests.log + set -o pipefail && make test-bdd 2>&1 | tee /tmp/9-5-1-test-bdd.log + ``` + +3. Final quality gates: + + ```shell + set -o pipefail && make fmt 2>&1 | tee /tmp/9-5-1-fmt.log + set -o pipefail && make check-fmt 2>&1 | tee /tmp/9-5-1-check-fmt.log + set -o pipefail && make markdownlint 2>&1 | tee /tmp/9-5-1-markdownlint.log + set -o pipefail && make nixie 2>&1 | tee /tmp/9-5-1-nixie.log + set -o pipefail && make lint 2>&1 | tee /tmp/9-5-1-lint.log + set -o pipefail && make test-doc 2>&1 | tee /tmp/9-5-1-test-doc.log + set -o pipefail && make test 2>&1 | tee /tmp/9-5-1-test.log + ``` + +Expected success indicators: + +- Focused tests and `make test-bdd` include new serializer boundary coverage. +- `make lint` passes with zero warnings (`-D warnings`). +- `make test-doc` and `make test` pass across all enabled features. +- Markdown and formatting gates pass for documentation updates. + +## Validation and acceptance + +Acceptance is complete only when all the following are true: + +- Public APIs no longer require direct `bincode` traits at serializer + boundaries; adapter traits are used instead. +- Existing bincode users retain a clear migration path and compatibility + behaviour. +- Optional bridge support is available (Serde path for this item) and covered + by tests. +- Frame metadata is exposed to deserialization through a documented, tested + context contract that supports version negotiation. +- Unit tests use `rstest`; behavioural tests use `rstest-bdd` v0.5.0. +- `docs/adr-005-serializer-abstraction.md` and `docs/users-guide.md` are + updated, and `docs/roadmap.md` marks `9.5.1` as done. +- All commands listed in `Concrete steps` pass. + +## Idempotence and recovery + +- All validation commands are safe to re-run. +- If trait-bound changes cause widespread compile errors, first add temporary + compatibility shims, then migrate call sites incrementally. +- If metadata context propagation breaks existing serializers, keep a default + empty-context deserialization path while migrating serializers one by one. +- If behavioural tests become flaky, capture deterministic fixtures and inputs + in world state and convert them into stable regression scenarios. +- If docs drift from implementation, stop roadmap completion until docs and + code are aligned. + +## Artifacts and notes + +Keep the following artifacts during implementation: + +- `/tmp/9-5-1-*.log` command logs. +- Final adapter trait signatures and context type definitions. +- List of migrated call sites that previously required `message::Message`. +- Names of new rstest unit tests and rstest-bdd scenarios. +- Migration examples added to `docs/users-guide.md`. + +## Interfaces and dependencies + +Target interface shape (names may vary slightly, intent must remain): + +`DeserializeContext` fields must be projections of the canonical metadata model +returned by `FrameMetadata::parse` (`FrameMetadata::Frame`). If the metadata +model changes, update this context shape in lockstep (or expose that model +directly) to avoid drift. + +```rust +pub struct DeserializeContext<'a> { + // Keep these convenience fields aligned with identifiers extracted from + // FrameMetadata::Frame in the active serializer implementation. + pub frame_metadata: Option<&'a [u8]>, + pub message_id: Option, + pub correlation_id: Option, + pub metadata_bytes_consumed: Option, +} + +pub trait EncodeWith { + fn encode_with( + &self, + serializer: &S, + ) -> Result, Box>; +} + +pub trait DecodeWith: Sized { + fn decode_with( + serializer: &S, + bytes: &[u8], + context: &DeserializeContext<'_>, + ) -> Result<(Self, usize), Box>; +} + +pub trait Serializer { + fn serialize(&self, value: &M) + -> Result, Box> + where + M: EncodeWith, + Self: Sized; + + fn deserialize_with_context( + &self, + bytes: &[u8], + context: &DeserializeContext<'_>, + ) -> Result<(M, usize), Box> + where + M: DecodeWith, + Self: Sized; +} +``` + +Dependency expectations: + +- No new mandatory dependencies. +- Continue using existing `rstest`, `rstest-bdd`, and `rstest-bdd-macros` + versions already pinned in `Cargo.toml`. +- Optional bridge implementation should prefer existing `serde` dependency and + feature-gating in crate features. + +## Revision note + +2026-02-21: Initial draft created for roadmap item `9.5.1`, defining staged +implementation, serializer-adapter direction, metadata-context requirements, +test strategy, migration documentation scope, and roadmap completion criteria. diff --git a/docs/roadmap.md b/docs/roadmap.md index de385954..82cd2b34 100644 --- a/docs/roadmap.md +++ b/docs/roadmap.md @@ -411,14 +411,14 @@ integration boundaries. ### 9.5. Serializer boundaries and protocol metadata -- [ ] 9.5.1. Decouple message encoding from `bincode`-specific traits to +- [x] 9.5.1. Decouple message encoding from `bincode`-specific traits to support alternative serializers.[^router-design][^adr-005] - - [ ] Introduce a serializer-agnostic message trait or adaptor layer for + - [x] Introduce a serializer-agnostic message trait or adaptor layer for `Message` types. - - [ ] Provide optional wire-rs or Serde bridges to reduce manual boilerplate. - - [ ] Define how frame metadata is exposed to the deserialization context to + - [x] Provide optional wire-rs or Serde bridges to reduce manual boilerplate. + - [x] Define how frame metadata is exposed to the deserialization context to enable version negotiation.[^message-versioning] - - [ ] Add migration guidance covering existing `bincode` users. + - [x] Add migration guidance covering existing `bincode` users. ### 9.6. Codec performance benchmarks diff --git a/docs/users-guide.md b/docs/users-guide.md index 2742bc2e..7730da82 100644 --- a/docs/users-guide.md +++ b/docs/users-guide.md @@ -33,7 +33,8 @@ Vocabulary rules: - Use `envelope` for routable wrappers and instances; use `packet` when discussing the `Packet` trait abstraction or `PacketParts`. - Use `message` for typed application payloads implementing - `wireframe::message::Message`. + `wireframe::message::EncodeWith`/`wireframe::message::DecodeWith` (or + `wireframe::message::Message` for bincode-compatible types). - Use `fragment` only for transport-level split/reassembly units. For invariants and naming rules used across internal modules, see the @@ -379,9 +380,41 @@ that meets the trait bounds.[^4] When `FrameMetadata::parse` succeeds, the framework extracts identifiers from metadata without deserializing the payload. If parsing fails, it falls back to -full deserialization.[^9][^6] Application messages implement the `Message` -trait, gaining `to_bytes` and `from_bytes` helpers that use bincode with the -standard configuration.[^10] +full deserialization.[^9][^6] Serializer integration now uses adapter traits: +`EncodeWith` for encoding and `DecodeWith` for decoding. Existing bincode +message types remain compatible through `Message`, which still provides +`to_bytes` and `from_bytes` helpers.[^10] + +### Migration from `Message`-only bounds + +If existing code previously relied on `M: Message` for client/server API calls, +no change is required for bincode-compatible types. + +- Existing `#[derive(bincode::Encode, bincode::BorrowDecode)]` payloads still + work in `send`, `receive`, `call`, and `send_response`. +- Serializer implementations should update method signatures to use + `EncodeWith` and `DecodeWith` bounds. +- Custom serializers that rely on legacy `Message`-derived payload encoding + should also implement `wireframe::serializer::MessageCompatibilitySerializer`. +- Metadata-aware serializers can implement + `Serializer::deserialize_with_context` to inspect `DeserializeContext`. +- `Serializer` is not object-safe (`Self: Sized` on serializer entry points), so + using `dyn Serializer` directly is unsupported unless you provide concrete + wrappers. For migration, keep concrete serializer types in `EncodeWith` / + `DecodeWith` bounds, retain + `wireframe::serializer::MessageCompatibilitySerializer` for legacy + `Message`-based payloads, and use `SerdeMessage` with `SerdeSerializerBridge` + for Serde-only payloads. When runtime-polymorphic serializer behaviour is + required, wrap concrete serializer implementations in adapter wrappers that + expose object-safe APIs and delegate to + `Serializer::deserialize_with_context` as needed. + +Optional Serde bridge support is available behind the feature +`serializer-serde`. Wrap values with `SerdeMessage` (or +`into_serde_message()`) and implement `SerdeSerializerBridge` on the serializer +to reduce per-type adapter boilerplate. This explicit wrapper is required +because blanket Serde adapters would overlap with the default `T: Message` +adapter implementations. ### Fragmentation metadata diff --git a/examples/metadata_routing.rs b/examples/metadata_routing.rs index 7dec6543..46e78ef0 100644 --- a/examples/metadata_routing.rs +++ b/examples/metadata_routing.rs @@ -12,8 +12,8 @@ use wireframe::{ app::Envelope, byte_order::{read_network_u16, write_network_u16}, frame::FrameMetadata, - message::Message, - serializer::Serializer, + message::{DecodeWith, DeserializeContext, EncodeWith, Message}, + serializer::{MessageCompatibilitySerializer, Serializer}, }; type App = wireframe::app::WireframeApp; @@ -24,21 +24,24 @@ const MAX_FRAME: usize = 64 * 1024; #[derive(Default)] struct HeaderSerializer; +impl MessageCompatibilitySerializer for HeaderSerializer {} + impl Serializer for HeaderSerializer { - fn serialize( - &self, - value: &M, - ) -> Result, Box> { - value - .to_bytes() - .map_err(|e| Box::new(e) as Box) + fn serialize(&self, value: &M) -> Result, Box> + where + M: EncodeWith, + { + value.encode_with(self) } - fn deserialize( + fn deserialize( &self, bytes: &[u8], - ) -> Result<(M, usize), Box> { - M::from_bytes(bytes).map_err(|e| Box::new(e) as Box) + ) -> Result<(M, usize), Box> + where + M: DecodeWith, + { + M::decode_with(self, bytes, &DeserializeContext::empty()) } } diff --git a/src/app/codec_driver.rs b/src/app/codec_driver.rs index 560bb602..65feb9ce 100644 --- a/src/app/codec_driver.rs +++ b/src/app/codec_driver.rs @@ -25,6 +25,7 @@ use super::{ use crate::{ codec::FrameCodec, fragment::{FragmentationConfig, FragmentationError}, + message::EncodeWith, serializer::Serializer, }; @@ -126,6 +127,7 @@ where S: Serializer + Send + Sync, W: AsyncRead + AsyncWrite + Unpin, F: FrameCodec, + Envelope: EncodeWith, { let bytes = serializer.serialize(envelope).map_err(|e| { let id = envelope.id; @@ -170,6 +172,7 @@ where S: Serializer + Send + Sync, W: AsyncRead + AsyncWrite + Unpin, F: FrameCodec, + Envelope: EncodeWith, { for envelope in envelopes.drain(..) { send_envelope(serializer, codec, framed, &envelope).await?; diff --git a/src/app/envelope.rs b/src/app/envelope.rs index dbdff511..ceba4f4b 100644 --- a/src/app/envelope.rs +++ b/src/app/envelope.rs @@ -9,7 +9,6 @@ use crate::{ correlation::CorrelatableFrame, fragment::{FragmentParts, Fragmentable}, - message::Message, }; /// Envelope-like type used to wrap incoming and outgoing messages. @@ -24,7 +23,6 @@ use crate::{ /// use wireframe::{ /// app::{Packet, PacketParts}, /// correlation::CorrelatableFrame, -/// message::Message, /// }; /// /// #[derive(bincode::BorrowDecode, bincode::Encode)] @@ -61,7 +59,7 @@ use crate::{ /// } /// } /// ``` -pub trait Packet: CorrelatableFrame + Message + Send + Sync + 'static { +pub trait Packet: CorrelatableFrame + Send + Sync + 'static { /// Return the message identifier used for routing. fn id(&self) -> u32; @@ -99,8 +97,6 @@ pub trait Packet: CorrelatableFrame + Message + Send + Sync + 'static { /// fn set_correlation_id(&mut self, cid: Option) { self.correlation_id = cid; } /// } /// - /// // Message is auto-implemented via the blanket impl for Encode + BorrowDecode types. - /// /// impl Packet for MyEnvelope { /// fn id(&self) -> u32 { self.id } /// fn into_parts(self) -> PacketParts { diff --git a/src/app/frame_handling/response.rs b/src/app/frame_handling/response.rs index bf1671d0..45a05456 100644 --- a/src/app/frame_handling/response.rs +++ b/src/app/frame_handling/response.rs @@ -8,6 +8,7 @@ use tokio::io::{AsyncRead, AsyncWrite}; use crate::{ app::{Envelope, Packet, PacketParts, codec_driver::flush_pipeline_output}, codec::FrameCodec, + message::EncodeWith, middleware::{HandlerService, Service, ServiceRequest}, serializer::Serializer, }; @@ -35,6 +36,7 @@ where E: Packet, W: AsyncRead + AsyncWrite + Unpin, F: FrameCodec, + Envelope: EncodeWith, { let request = ServiceRequest::new(env.payload, env.correlation_id); let resp = match service.call(request).await { diff --git a/src/app/inbound_handler.rs b/src/app/inbound_handler.rs index 2952cf0e..427b899d 100644 --- a/src/app/inbound_handler.rs +++ b/src/app/inbound_handler.rs @@ -22,7 +22,7 @@ use super::{ use crate::{ codec::{FrameCodec, LengthDelimitedFrameCodec, MAX_FRAME_LENGTH, clamp_frame_length}, frame::FrameMetadata, - message::Message, + message::{DecodeWith, DeserializeContext, EncodeWith}, message_assembler::MessageAssemblyState, middleware::HandlerService, serializer::Serializer, @@ -71,7 +71,7 @@ where ) -> std::result::Result<(), SendError> where W: AsyncWrite + Unpin, - M: Message, + M: EncodeWith, { let bytes = self .serializer @@ -103,7 +103,7 @@ where ) -> std::result::Result<(), SendError> where W: AsyncRead + AsyncWrite + Unpin, - M: Message, + M: EncodeWith, Cc: Encoder, { let bytes = self @@ -142,7 +142,7 @@ where ) -> std::result::Result<(), SendError> where W: AsyncRead + AsyncWrite + Unpin, - M: Message, + M: EncodeWith, { let bytes = self .serializer @@ -158,6 +158,7 @@ where C: Send + 'static, E: Packet, F: FrameCodec, + Envelope: DecodeWith + EncodeWith, { /// Try parsing the frame using [`FrameMetadata::parse`], falling back to /// full deserialization on failure. @@ -165,10 +166,23 @@ where &self, payload: &[u8], ) -> std::result::Result<(Envelope, usize), Box> { - self.serializer - .parse(payload) - .map_err(Box::::from) - .or_else(|_| self.serializer.deserialize::(payload)) + match self.serializer.parse(payload) { + Ok((parsed_envelope, metadata_bytes_consumed)) => { + if !self.serializer.should_deserialize_after_parse() { + return Ok((parsed_envelope, metadata_bytes_consumed)); + } + + let context = DeserializeContext { + frame_metadata: payload.get(..metadata_bytes_consumed), + message_id: Some(parsed_envelope.id), + correlation_id: parsed_envelope.correlation_id, + metadata_bytes_consumed: Some(metadata_bytes_consumed), + }; + self.serializer + .deserialize_with_context::(payload, &context) + } + Err(_) => self.serializer.deserialize::(payload), + } } /// Handle an accepted connection end-to-end, returning any processing error. diff --git a/src/client/messaging.rs b/src/client/messaging.rs index 305dd6ae..e8f23a8b 100644 --- a/src/client/messaging.rs +++ b/src/client/messaging.rs @@ -9,7 +9,11 @@ use bytes::Bytes; use futures::{SinkExt, StreamExt}; use super::{ClientError, runtime::ClientStream}; -use crate::{app::Packet, message::Message, serializer::Serializer}; +use crate::{ + app::Packet, + message::{DecodeWith, EncodeWith}, + serializer::Serializer, +}; /// Extension trait providing envelope-based messaging methods. /// @@ -104,7 +108,10 @@ where /// # Ok(()) /// # } /// ``` - pub async fn send_envelope(&mut self, mut envelope: P) -> Result { + pub async fn send_envelope

(&mut self, mut envelope: P) -> Result + where + P: Packet + EncodeWith, + { // Check once whether correlation ID is present. let existing = envelope.correlation_id(); let correlation_id = existing.unwrap_or_else(|| self.next_correlation_id()); @@ -163,7 +170,10 @@ where /// # Ok(()) /// # } /// ``` - pub async fn receive_envelope(&mut self) -> Result { + pub async fn receive_envelope

(&mut self) -> Result + where + P: Packet + DecodeWith, + { self.receive_internal().await } @@ -202,7 +212,10 @@ where /// # Ok(()) /// # } /// ``` - pub async fn call_correlated(&mut self, request: P) -> Result { + pub async fn call_correlated

(&mut self, request: P) -> Result + where + P: Packet + EncodeWith + DecodeWith, + { let correlation_id = self.send_envelope(request).await?; let response: P = self.receive_envelope().await?; @@ -221,7 +234,7 @@ where } /// Internal helper for receiving and deserializing a frame. - pub(crate) async fn receive_internal(&mut self) -> Result { + pub(crate) async fn receive_internal>(&mut self) -> Result { let Some(frame) = self.framed.next().await else { let err = ClientError::disconnected(); self.invoke_error_hook(&err).await; diff --git a/src/client/response_stream.rs b/src/client/response_stream.rs index 3215049c..6e69d00f 100644 --- a/src/client/response_stream.rs +++ b/src/client/response_stream.rs @@ -14,7 +14,7 @@ use std::{ use futures::Stream; use super::{ClientError, runtime::ClientStream}; -use crate::{app::Packet, serializer::Serializer}; +use crate::{app::Packet, message::DecodeWith, serializer::Serializer}; /// An async stream of typed frames received from a streaming server response. /// @@ -60,7 +60,7 @@ use crate::{app::Packet, serializer::Serializer}; /// ``` pub struct ResponseStream<'a, P, S, T, C> where - P: Packet, + P: Packet + DecodeWith, S: Serializer + Send + Sync, T: ClientStream, { @@ -72,7 +72,7 @@ where impl<'a, P, S, T, C> ResponseStream<'a, P, S, T, C> where - P: Packet, + P: Packet + DecodeWith, S: Serializer + Send + Sync, T: ClientStream, { @@ -134,7 +134,7 @@ where impl Stream for ResponseStream<'_, P, S, T, C> where - P: Packet, + P: Packet + DecodeWith, S: Serializer + Send + Sync, T: ClientStream, { diff --git a/src/client/runtime.rs b/src/client/runtime.rs index 12c3673b..1ad2dc9d 100644 --- a/src/client/runtime.rs +++ b/src/client/runtime.rs @@ -17,7 +17,7 @@ use super::{ hooks::{ClientConnectionTeardownHandler, ClientErrorHandler}, }; use crate::{ - message::Message, + message::{DecodeWith, EncodeWith}, rewind_stream::RewindStream, serializer::{BincodeSerializer, Serializer}, }; @@ -136,7 +136,7 @@ where /// # Ok(()) /// # } /// ``` - pub async fn send(&mut self, message: &M) -> Result<(), ClientError> { + pub async fn send>(&mut self, message: &M) -> Result<(), ClientError> { let bytes = match self.serializer.serialize(message) { Ok(bytes) => bytes, Err(e) => { @@ -182,7 +182,7 @@ where /// # Ok(()) /// # } /// ``` - pub async fn receive(&mut self) -> Result { + pub async fn receive>(&mut self) -> Result { self.receive_internal().await } @@ -225,7 +225,7 @@ where /// # Ok(()) /// # } /// ``` - pub async fn call( + pub async fn call, Resp: DecodeWith>( &mut self, request: &Req, ) -> Result { diff --git a/src/client/streaming.rs b/src/client/streaming.rs index ab7943f3..00718b31 100644 --- a/src/client/streaming.rs +++ b/src/client/streaming.rs @@ -11,7 +11,11 @@ use bytes::Bytes; use futures::SinkExt; use super::{ClientError, ResponseStream, runtime::ClientStream}; -use crate::{app::Packet, serializer::Serializer}; +use crate::{ + app::Packet, + message::{DecodeWith, EncodeWith}, + serializer::Serializer, +}; impl super::WireframeClient where @@ -66,10 +70,13 @@ where /// # Ok(()) /// # } /// ``` - pub async fn call_streaming( + pub async fn call_streaming

( &mut self, mut request: P, - ) -> Result, ClientError> { + ) -> Result, ClientError> + where + P: Packet + EncodeWith + DecodeWith, + { let existing = request.correlation_id(); let correlation_id = existing.unwrap_or_else(|| self.correlation_counter.fetch_add(1, Ordering::Relaxed)); @@ -133,10 +140,10 @@ where /// # Ok(()) /// # } /// ``` - pub fn receive_streaming( - &mut self, - correlation_id: u64, - ) -> ResponseStream<'_, P, S, T, C> { + pub fn receive_streaming

(&mut self, correlation_id: u64) -> ResponseStream<'_, P, S, T, C> + where + P: Packet + DecodeWith, + { ResponseStream::new(self, correlation_id) } } diff --git a/src/client/tests/helpers.rs b/src/client/tests/helpers.rs index b6d7be36..50167c80 100644 --- a/src/client/tests/helpers.rs +++ b/src/client/tests/helpers.rs @@ -14,7 +14,7 @@ use tokio::net::{TcpListener, TcpStream}; use crate::{ client::{ClientError, WireframeClient, WireframeClientBuilder}, - serializer::{BincodeSerializer, Serializer}, + serializer::{BincodeSerializer, MessageCompatibilitySerializer, Serializer}, }; /// Type alias for async hooks that return their input after performing side effects. @@ -131,20 +131,25 @@ where /// A serializer that always fails to serialize, used for testing error hooks. pub struct FailingSerializer; +impl MessageCompatibilitySerializer for FailingSerializer {} + impl Serializer for FailingSerializer { - fn serialize( - &self, - _value: &M, - ) -> Result, Box> { + fn serialize(&self, _value: &M) -> Result, Box> + where + M: crate::message::EncodeWith, + { Err(Box::new(std::io::Error::other( "forced serialization failure", ))) } - fn deserialize( + fn deserialize( &self, _bytes: &[u8], - ) -> Result<(M, usize), Box> { + ) -> Result<(M, usize), Box> + where + M: crate::message::DecodeWith, + { Err(Box::new(std::io::Error::other( "forced deserialization failure", ))) diff --git a/src/message.rs b/src/message.rs index 7f0b8931..e591a5cc 100644 --- a/src/message.rs +++ b/src/message.rs @@ -3,6 +3,8 @@ //! Types implementing [`Message`] can be encoded and decoded using //! `bincode` with standard configuration. +use std::error::Error; + use bincode::{ BorrowDecode, Encode, @@ -12,6 +14,60 @@ use bincode::{ error::{DecodeError, EncodeError}, }; +/// Context supplied to metadata-aware deserializers. +/// +/// This structure carries the metadata fields extracted during +/// [`crate::frame::FrameMetadata::parse`] so serializers can select versioned +/// decode paths without depending on hidden global state. +#[derive(Clone, Copy, Debug, Default)] +pub struct DeserializeContext<'a> { + /// Raw metadata bytes extracted from the frame, if available. + pub frame_metadata: Option<&'a [u8]>, + /// Parsed message identifier from metadata, if available. + pub message_id: Option, + /// Parsed correlation identifier from metadata, if available. + pub correlation_id: Option, + /// Number of source bytes consumed while parsing metadata. + pub metadata_bytes_consumed: Option, +} + +impl DeserializeContext<'_> { + /// Return an empty context. + #[must_use] + pub const fn empty() -> Self { + Self { + frame_metadata: None, + message_id: None, + correlation_id: None, + metadata_bytes_consumed: None, + } + } +} + +/// Serializer-agnostic encoding adapter used by [`crate::serializer::Serializer`]. +pub trait EncodeWith { + /// Encode `self` with `serializer`. + /// + /// # Errors + /// + /// Returns an error when encoding fails. + fn encode_with(&self, serializer: &S) -> Result, Box>; +} + +/// Serializer-agnostic decoding adapter used by [`crate::serializer::Serializer`]. +pub trait DecodeWith: Sized { + /// Decode `Self` from `bytes` with optional metadata context. + /// + /// # Errors + /// + /// Returns an error when decoding fails. + fn decode_with( + serializer: &S, + bytes: &[u8], + context: &DeserializeContext<'_>, + ) -> Result<(Self, usize), Box>; +} + /// Wrapper trait for application message types. /// /// Any type deriving [`Encode`] and [`BorrowDecode`] automatically implements @@ -56,3 +112,179 @@ pub trait Message: Encode + for<'de> BorrowDecode<'de, ()> { } impl Message for T where T: Encode + for<'de> BorrowDecode<'de, ()> {} + +impl EncodeWith for T +where + S: crate::serializer::MessageCompatibilitySerializer + ?Sized, + T: Message, +{ + fn encode_with(&self, _serializer: &S) -> Result, Box> { + self.to_bytes() + .map_err(|error| Box::new(error) as Box) + } +} + +impl DecodeWith for T +where + S: crate::serializer::MessageCompatibilitySerializer + ?Sized, + T: Message, +{ + fn decode_with( + _serializer: &S, + bytes: &[u8], + _context: &DeserializeContext<'_>, + ) -> Result<(Self, usize), Box> { + T::from_bytes(bytes).map_err(|error| Box::new(error) as Box) + } +} + +#[cfg(feature = "serializer-serde")] +pub mod serde_bridge { + //! Optional Serde wrapper adapters for serializer bridges. + //! + //! `SerdeMessage` exists because Rust coherence prevents adding blanket + //! `EncodeWith`/`DecodeWith` impls for all `T: Serialize + + //! DeserializeOwned`: those impls would overlap with the default + //! `T: Message` adapters. + + use serde::{Serialize, de::DeserializeOwned}; + + use super::{DecodeWith, DeserializeContext, EncodeWith}; + use crate::serializer::SerdeSerializerBridge; + + /// Wrapper providing `EncodeWith`/`DecodeWith` implementations via Serde. + /// + /// This wrapper is the explicit opt-in path for Serde-only payloads. + #[derive(Clone, Debug, PartialEq, Eq)] + pub struct SerdeMessage(T); + + impl SerdeMessage { + /// Wrap a value for Serde bridge encoding. + #[must_use] + pub const fn new(value: T) -> Self { Self(value) } + + /// Borrow the wrapped value. + #[must_use] + pub const fn as_ref(&self) -> &T { &self.0 } + + /// Consume the wrapper and return the inner value. + #[must_use] + pub fn into_inner(self) -> T { self.0 } + } + + impl From for SerdeMessage { + fn from(value: T) -> Self { Self(value) } + } + + impl EncodeWith for SerdeMessage + where + S: SerdeSerializerBridge + ?Sized, + T: Serialize, + { + fn encode_with( + &self, + serializer: &S, + ) -> Result, Box> { + serializer.serialize_serde(self.as_ref()) + } + } + + impl DecodeWith for SerdeMessage + where + S: SerdeSerializerBridge + ?Sized, + T: DeserializeOwned, + { + fn decode_with( + serializer: &S, + bytes: &[u8], + context: &DeserializeContext<'_>, + ) -> Result<(Self, usize), Box> { + let (decoded, consumed) = serializer.deserialize_serde(bytes, context)?; + Ok((Self::new(decoded), consumed)) + } + } + + /// Helper trait to wrap values ergonomically for Serde bridge calls. + pub trait IntoSerdeMessage: Sized { + /// Wrap `self` as a [`SerdeMessage`]. + fn into_serde_message(self) -> SerdeMessage { SerdeMessage::new(self) } + } + + impl IntoSerdeMessage for T {} +} + +#[cfg(test)] +mod tests { + use rstest::rstest; + + use super::{DecodeWith, DeserializeContext, EncodeWith}; + use crate::serializer::BincodeSerializer; + + #[derive(bincode::Decode, bincode::Encode, Debug, PartialEq, Eq)] + struct RoundTripMessage { + value: u32, + } + + #[rstest] + fn bincode_adapter_round_trip() { + let serializer = BincodeSerializer; + let original = RoundTripMessage { value: 41 }; + + let bytes = original + .encode_with(&serializer) + .expect("bincode adapter should encode message"); + let (decoded, consumed) = + RoundTripMessage::decode_with(&serializer, &bytes, &DeserializeContext::empty()) + .expect("bincode adapter should decode encoded message"); + + assert_eq!(decoded, original); + assert_eq!(consumed, bytes.len()); + } + + #[rstest] + fn deserialize_context_empty_defaults_to_none() { + let context = DeserializeContext::empty(); + assert!(context.frame_metadata.is_none()); + assert!(context.message_id.is_none()); + assert!(context.correlation_id.is_none()); + assert!(context.metadata_bytes_consumed.is_none()); + } + + #[cfg(feature = "serializer-serde")] + mod serde_bridge_tests { + use rstest::rstest; + use serde::{Deserialize, Serialize}; + + use super::super::{ + DecodeWith, + DeserializeContext, + EncodeWith, + serde_bridge::{IntoSerdeMessage, SerdeMessage}, + }; + use crate::serializer::BincodeSerializer; + + #[derive(Debug, PartialEq, Eq, Serialize, Deserialize)] + struct SerdePayload { + id: u32, + } + + #[rstest] + fn serde_message_round_trip_with_bincode_bridge() { + let serializer = BincodeSerializer; + let original = SerdePayload { id: 9 }.into_serde_message(); + + let bytes = original + .encode_with(&serializer) + .expect("serde bridge should encode payload"); + let (decoded, consumed) = SerdeMessage::::decode_with( + &serializer, + &bytes, + &DeserializeContext::empty(), + ) + .expect("serde bridge should decode encoded payload"); + + assert_eq!(decoded.into_inner().id, 9); + assert_eq!(consumed, bytes.len()); + } + } +} diff --git a/src/prelude.rs b/src/prelude.rs index 4bdd9895..3ea8e104 100644 --- a/src/prelude.rs +++ b/src/prelude.rs @@ -11,12 +11,14 @@ //! fn build() -> Result { WireframeApp::new() } //! ``` +#[cfg(feature = "serializer-serde")] +pub use crate::message::serde_bridge::{IntoSerdeMessage, SerdeMessage}; pub use crate::{ app::{Envelope, Handler, Middleware, WireframeApp}, error::{Result, WireframeError}, - message::Message, + message::{DecodeWith, DeserializeContext, EncodeWith, Message}, response::Response, - serializer::{BincodeSerializer, Serializer}, + serializer::{BincodeSerializer, MessageCompatibilitySerializer, Serializer}, }; #[cfg(not(loom))] pub use crate::{ diff --git a/src/serializer.rs b/src/serializer.rs index 6646c4bb..7734b468 100644 --- a/src/serializer.rs +++ b/src/serializer.rs @@ -6,45 +6,145 @@ use std::error::Error; -use crate::{frame::FrameMetadata, message::Message}; +#[cfg(feature = "serializer-serde")] +use bincode::config; +#[cfg(feature = "serializer-serde")] +use bincode::serde::{decode_from_slice, encode_to_vec}; + +use crate::{ + frame::FrameMetadata, + message::{DecodeWith, DeserializeContext, EncodeWith, Message}, +}; + +/// Marker trait for serializers that opt into legacy [`Message`] compatibility. +/// +/// Implement this trait when `T: Message` values should automatically satisfy +/// `EncodeWith` and `DecodeWith`. +pub trait MessageCompatibilitySerializer {} + +/// Optional bridge trait used by `message::serde_bridge`. +#[cfg(feature = "serializer-serde")] +pub trait SerdeSerializerBridge { + /// Serialize a Serde value into bytes. + /// + /// # Errors + /// + /// Returns an error if serialization fails. + fn serialize_serde( + &self, + value: &T, + ) -> Result, Box>; + + /// Deserialize a Serde value from bytes with optional metadata context. + /// + /// # Errors + /// + /// Returns an error if deserialization fails. + fn deserialize_serde( + &self, + bytes: &[u8], + context: &DeserializeContext<'_>, + ) -> Result<(T, usize), Box>; +} /// Trait for serializing and deserializing messages. +/// +/// # Object Safety +/// +/// This trait is not object-safe. Its core methods include `Self: Sized` +/// bounds, so `dyn Serializer` cannot call `serialize`, `deserialize`, or +/// `deserialize_with_context`. +/// +/// Use concrete serializer types (for example `BincodeSerializer`) in API +/// bounds. If runtime selection is required, introduce an explicit type-erased +/// wrapper that provides object-safe forwarding methods. pub trait Serializer { /// Serialize `value` into a byte vector. /// /// # Errors /// /// Returns an error if the value cannot be serialized. - fn serialize(&self, value: &M) -> Result, Box>; + fn serialize(&self, value: &M) -> Result, Box> + where + M: EncodeWith, + Self: Sized; /// Deserialize a message from `bytes`, returning the message and bytes consumed. /// /// # Errors /// /// Returns an error if the bytes cannot be parsed into a message. - fn deserialize( + fn deserialize(&self, bytes: &[u8]) -> Result<(M, usize), Box> + where + M: DecodeWith, + Self: Sized; + + /// Deserialize a message from `bytes` using parsed frame metadata context. + /// + /// The default implementation delegates to [`Serializer::deserialize`], + /// preserving existing serializers that are not metadata-aware. + /// + /// # Errors + /// + /// Returns an error if the bytes cannot be parsed into a message. + fn deserialize_with_context( &self, bytes: &[u8], - ) -> Result<(M, usize), Box>; + _context: &DeserializeContext<'_>, + ) -> Result<(M, usize), Box> + where + M: DecodeWith, + Self: Sized, + { + self.deserialize(bytes) + } + + /// Decide whether [`Serializer::deserialize_with_context`] should run after + /// a successful [`FrameMetadata::parse`] call on the inbound path. + /// + /// Returning `false` skips `deserialize_with_context` after parse succeeds. + /// This is useful when `parse` already performs full frame decoding and a + /// second decode pass would be redundant. + /// + /// This flag is consulted by the inbound handler in + /// `src/app/inbound_handler.rs` (`parse_envelope`). + #[must_use] + fn should_deserialize_after_parse(&self) -> bool { true } } /// Serializer using `bincode` with its standard configuration. #[derive(Clone, Copy, Debug, Default)] pub struct BincodeSerializer; +impl MessageCompatibilitySerializer for BincodeSerializer {} + impl Serializer for BincodeSerializer { - fn serialize(&self, value: &M) -> Result, Box> { - value - .to_bytes() - .map_err(|e| Box::new(e) as Box) + fn serialize(&self, value: &M) -> Result, Box> + where + M: EncodeWith, + { + value.encode_with(self) + } + + fn deserialize(&self, bytes: &[u8]) -> Result<(M, usize), Box> + where + M: DecodeWith, + { + M::decode_with(self, bytes, &DeserializeContext::empty()) } - fn deserialize( + fn deserialize_with_context( &self, bytes: &[u8], - ) -> Result<(M, usize), Box> { - M::from_bytes(bytes).map_err(|e| Box::new(e) as Box) + context: &DeserializeContext<'_>, + ) -> Result<(M, usize), Box> + where + M: DecodeWith, + { + M::decode_with(self, bytes, context) } + + fn should_deserialize_after_parse(&self) -> bool { false } } impl FrameMetadata for BincodeSerializer { @@ -55,3 +155,23 @@ impl FrameMetadata for BincodeSerializer { crate::app::Envelope::from_bytes(src) } } + +#[cfg(feature = "serializer-serde")] +impl SerdeSerializerBridge for BincodeSerializer { + fn serialize_serde( + &self, + value: &T, + ) -> Result, Box> { + encode_to_vec(value, config::standard()) + .map_err(|error| Box::new(error) as Box) + } + + fn deserialize_serde( + &self, + bytes: &[u8], + _context: &DeserializeContext<'_>, + ) -> Result<(T, usize), Box> { + decode_from_slice(bytes, config::standard()) + .map_err(|error| Box::new(error) as Box) + } +} diff --git a/src/server/connection_spawner.rs b/src/server/connection_spawner.rs index 0cc79770..ed19e69b 100644 --- a/src/server/connection_spawner.rs +++ b/src/server/connection_spawner.rs @@ -11,6 +11,7 @@ use crate::{ app::{Envelope, Packet}, codec::FrameCodec, frame::FrameMetadata, + message::{DecodeWith, EncodeWith}, preamble::{Preamble, read_preamble}, rewind_stream::RewindStream, serializer::Serializer, @@ -30,6 +31,7 @@ pub(super) fn spawn_connection_task( Ctx: Send + 'static, E: Packet + 'static, Codec: FrameCodec, + Envelope: DecodeWith + EncodeWith, { let peer_addr = match stream.peer_addr() { Ok(addr) => Some(addr), @@ -64,6 +66,7 @@ async fn process_stream( Ctx: Send + 'static, E: Packet + 'static, Codec: FrameCodec, + Envelope: DecodeWith + EncodeWith, { let PreambleHooks { on_success, @@ -92,6 +95,7 @@ where Ctx: Send + 'static, E: Packet + 'static, Codec: FrameCodec, + Envelope: DecodeWith + EncodeWith, { match factory.build() { Ok(app) => { diff --git a/src/server/runtime.rs b/src/server/runtime.rs index cba93782..e042f259 100644 --- a/src/server/runtime.rs +++ b/src/server/runtime.rs @@ -21,6 +21,7 @@ use crate::{ app::{Envelope, Packet}, codec::FrameCodec, frame::FrameMetadata, + message::{DecodeWith, EncodeWith}, preamble::Preamble, serializer::Serializer, }; @@ -33,6 +34,7 @@ where Ctx: Send + 'static, E: Packet, Codec: FrameCodec, + Envelope: DecodeWith + EncodeWith, { /// Run the server until a shutdown signal is received. /// diff --git a/src/server/runtime/accept.rs b/src/server/runtime/accept.rs index d8ec7bea..6279d033 100644 --- a/src/server/runtime/accept.rs +++ b/src/server/runtime/accept.rs @@ -16,6 +16,7 @@ use crate::{ app::{Envelope, Packet}, codec::FrameCodec, frame::FrameMetadata, + message::{DecodeWith, EncodeWith}, preamble::Preamble, serializer::Serializer, server::{ @@ -157,6 +158,7 @@ pub(in crate::server) async fn accept_loop( Ctx: Send + 'static, E: Packet + 'static, Codec: FrameCodec, + Envelope: DecodeWith + EncodeWith, { let AcceptLoopOptions { preamble, @@ -203,6 +205,7 @@ where Ctx: Send + 'static, E: Packet + 'static, Codec: FrameCodec, + Envelope: DecodeWith + EncodeWith, { select! { biased; diff --git a/tests/common/context_capturing_serializer.rs b/tests/common/context_capturing_serializer.rs new file mode 100644 index 00000000..f973843c --- /dev/null +++ b/tests/common/context_capturing_serializer.rs @@ -0,0 +1,95 @@ +//! Shared test serializer that captures deserialization context metadata. + +use std::sync::{Arc, Mutex}; + +use wireframe::{ + app::Envelope, + frame::FrameMetadata, + message::{DecodeWith, DeserializeContext, EncodeWith}, + serializer::{BincodeSerializer, MessageCompatibilitySerializer, Serializer}, +}; + +/// Captured metadata from [`DeserializeContext`]. +#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)] +pub struct CapturedDeserializeContext { + /// Parsed message identifier from metadata, if available. + pub message_id: Option, + /// Parsed correlation identifier from metadata, if available. + pub correlation_id: Option, + /// Number of source bytes consumed while parsing metadata. + pub metadata_bytes_consumed: Option, + /// Raw frame metadata length, if available. + pub frame_metadata_len: Option, +} + +impl CapturedDeserializeContext { + /// Build a captured snapshot from `context`. + #[must_use] + pub fn from_context(context: &DeserializeContext<'_>) -> Self { + Self { + message_id: context.message_id, + correlation_id: context.correlation_id, + metadata_bytes_consumed: context.metadata_bytes_consumed, + frame_metadata_len: context.frame_metadata.map(<[u8]>::len), + } + } +} + +/// Test serializer that stores the latest deserialization context. +#[derive(Default)] +pub struct ContextCapturingSerializer { + captured: Arc>>, +} + +impl ContextCapturingSerializer { + /// Construct a serializer that writes into `captured`. + #[must_use] + pub fn new(captured: Arc>>) -> Self { + Self { captured } + } +} + +impl MessageCompatibilitySerializer for ContextCapturingSerializer {} + +impl Serializer for ContextCapturingSerializer { + fn serialize(&self, value: &M) -> Result, Box> + where + M: EncodeWith, + { + value.encode_with(self) + } + + fn deserialize( + &self, + bytes: &[u8], + ) -> Result<(M, usize), Box> + where + M: DecodeWith, + { + M::decode_with(self, bytes, &DeserializeContext::empty()) + } + + fn deserialize_with_context( + &self, + bytes: &[u8], + context: &DeserializeContext<'_>, + ) -> Result<(M, usize), Box> + where + M: DecodeWith, + { + let mut state = self.captured.lock().map_err(|_| { + "ContextCapturingSerializer::deserialize_with_context captured mutex poisoned" + })?; + *state = Some(CapturedDeserializeContext::from_context(context)); + M::decode_with(self, bytes, context) + } +} + +impl FrameMetadata for ContextCapturingSerializer { + type Frame = Envelope; + type Error = bincode::error::DecodeError; + + fn parse(&self, src: &[u8]) -> Result<(Self::Frame, usize), Self::Error> { + BincodeSerializer.parse(src) + } +} diff --git a/tests/connection_actor_errors.rs b/tests/connection_actor_errors.rs index 881ad105..cf2d1eeb 100644 --- a/tests/connection_actor_errors.rs +++ b/tests/connection_actor_errors.rs @@ -48,7 +48,7 @@ async fn before_send_hook_modifies_frames( shutdown_token: CancellationToken, ) -> TestResult { let (queues, handle) = queues?; - push_expect!(handle.push_high_priority(1), "push high-priority"); + push_expect!(handle.push_high_priority(1), "push high-priority")?; let stream = stream::iter(vec![Ok(2u8)]); let hooks = ProtocolHooks { diff --git a/tests/connection_actor_fairness.rs b/tests/connection_actor_fairness.rs index 31a26b4f..c9711c01 100644 --- a/tests/connection_actor_fairness.rs +++ b/tests/connection_actor_fairness.rs @@ -38,8 +38,8 @@ async fn strict_priority_order( shutdown_token: CancellationToken, ) -> TestResult { let (queues, handle) = queues?; - push_expect!(handle.push_low_priority(2), "push low-priority"); - push_expect!(handle.push_high_priority(1), "push high-priority"); + push_expect!(handle.push_low_priority(2), "push low-priority")?; + push_expect!(handle.push_high_priority(1), "push high-priority")?; let stream = stream::iter(vec![Ok(3u8)]); let mut actor: ConnectionActor<_, ()> = @@ -67,9 +67,9 @@ async fn fairness_yields_low_after_burst( }; for n in 1..=5 { - push_expect!(handle.push_high_priority(n), "push high-priority"); + push_expect!(handle.push_high_priority(n), "push high-priority")?; } - push_expect!(handle.push_low_priority(99), "push low-priority"); + push_expect!(handle.push_low_priority(99), "push low-priority")?; let mut actor: ConnectionActor<_, ()> = ConnectionActor::new(queues, handle, None, shutdown_token); @@ -112,7 +112,7 @@ async fn queue_frames( push_expect!( handle.push_high_priority(next_high), format!("push high-priority frame {next_high}") - ); + )?; highs.push(next_high); next_high += 1; } @@ -120,7 +120,7 @@ async fn queue_frames( push_expect!( handle.push_low_priority(next_low), format!("push low-priority frame {next_low}") - ); + )?; lows.push(next_low); next_low += 1; } @@ -220,13 +220,13 @@ async fn fairness_yields_low_with_time_slice( let _ = tx.send(out); }); - push_expect!(handle.push_high_priority(1), "push high-priority"); + push_expect!(handle.push_high_priority(1), "push high-priority")?; time::advance(Duration::from_millis(5)).await; - push_expect!(handle.push_high_priority(2), "push high-priority"); + push_expect!(handle.push_high_priority(2), "push high-priority")?; time::advance(Duration::from_millis(15)).await; - push_expect!(handle.push_low_priority(42), "push low-priority"); + push_expect!(handle.push_low_priority(42), "push low-priority")?; for n in 3..=5 { - push_expect!(handle.push_high_priority(n), "push high-priority"); + push_expect!(handle.push_high_priority(n), "push high-priority")?; } drop(handle); diff --git a/tests/connection_actor_shutdown.rs b/tests/connection_actor_shutdown.rs index 2751785e..d92c7a7b 100644 --- a/tests/connection_actor_shutdown.rs +++ b/tests/connection_actor_shutdown.rs @@ -54,9 +54,9 @@ async fn complete_draining_of_sources( wireframe::push::PushConfigError, >, shutdown_token: CancellationToken, -) { +) -> TestResult { let (queues, handle) = queues.expect("fixture should build queues"); - push_expect!(handle.push_high_priority(1), "push high-priority"); + push_expect!(handle.push_high_priority(1), "push high-priority")?; let stream = stream::iter(vec![Ok(2u8), Ok(3u8)]); let mut actor: ConnectionActor<_, ()> = @@ -65,6 +65,7 @@ async fn complete_draining_of_sources( let mut out = Vec::new(); actor.run(&mut out).await.expect("actor run failed"); assert_eq!(out, vec![1, 2, 3]); + Ok(()) } #[rstest] @@ -102,19 +103,20 @@ async fn interleaved_shutdown_during_stream( #[rstest] #[tokio::test] #[serial] -async fn push_queue_exhaustion_backpressure() { +async fn push_queue_exhaustion_backpressure() -> TestResult { let (mut queues, handle) = PushQueues::::builder() .high_capacity(1) .low_capacity(1) .build() .expect("failed to build PushQueues"); - push_expect!(handle.push_high_priority(1), "push high-priority"); + push_expect!(handle.push_high_priority(1), "push high-priority")?; let blocked = timeout(Duration::from_millis(200), handle.push_high_priority(2)).await; assert!(blocked.is_err()); // clean up without exposing internal fields queues.close(); + Ok(()) } #[rstest] diff --git a/tests/features/serializer_boundaries.feature b/tests/features/serializer_boundaries.feature new file mode 100644 index 00000000..c5b5f2cc --- /dev/null +++ b/tests/features/serializer_boundaries.feature @@ -0,0 +1,15 @@ +@serializer_boundaries +Feature: Serializer boundaries and metadata context + Serializer boundaries should remain compatible with legacy bincode messages + while exposing metadata context to deserializers. + + Scenario: Legacy message round-trips through serializer-agnostic adapters + Given a legacy payload value 7 + When the legacy payload is encoded and decoded + Then the decoded legacy payload value is 7 + + Scenario: Metadata context is forwarded to deserialization + Given deserialize context message id 9 and correlation id 77 + When a context-aware serializer decodes with context + Then the captured message id is 9 + And the captured correlation id is 77 diff --git a/tests/fixtures/mod.rs b/tests/fixtures/mod.rs index b05281d3..ffe348aa 100644 --- a/tests/fixtures/mod.rs +++ b/tests/fixtures/mod.rs @@ -20,5 +20,6 @@ pub mod message_assembly_inbound; pub mod multi_packet; pub mod panic; pub mod request_parts; +pub mod serializer_boundaries; pub mod stream_end; pub mod unified_codec; diff --git a/tests/fixtures/serializer_boundaries.rs b/tests/fixtures/serializer_boundaries.rs new file mode 100644 index 00000000..66eb6f71 --- /dev/null +++ b/tests/fixtures/serializer_boundaries.rs @@ -0,0 +1,141 @@ +//! Fixture world for serializer boundary behavioural tests. + +use std::sync::{Arc, Mutex}; + +use rstest::fixture; +use wireframe::{ + app::Envelope, + message::DeserializeContext, + serializer::{BincodeSerializer, Serializer}, +}; + +#[path = "../common/context_capturing_serializer.rs"] +mod context_capturing_serializer; + +use context_capturing_serializer::{CapturedDeserializeContext, ContextCapturingSerializer}; +/// Shared result type used by serializer boundary fixtures and steps. +pub use wireframe_testing::TestResult; + +#[derive(bincode::Decode, bincode::Encode, Debug, PartialEq, Eq)] +struct LegacyPayload { + value: u32, +} + +/// Behavioural test world for serializer boundary scenarios. +#[derive(Default)] +pub struct SerializerBoundariesWorld { + legacy_value: Option, + decoded_legacy_value: Option, + context_message_id: Option, + context_correlation_id: Option, + captured_context: Arc>>, +} + +/// Fixture world for serializer boundary tests. +#[fixture] +pub fn serializer_boundaries_world() -> SerializerBoundariesWorld { + SerializerBoundariesWorld::default() +} + +impl SerializerBoundariesWorld { + /// Set the legacy value used for round-trip tests. + pub fn set_legacy_value(&mut self, value: u32) { self.legacy_value = Some(value); } + + /// # Errors + /// + /// Returns an error if a legacy input value was not set. + pub fn round_trip_legacy_payload(&mut self) -> TestResult { + let value = self.legacy_value.ok_or("legacy value not set")?; + let serializer = BincodeSerializer; + let payload = LegacyPayload { value }; + let bytes = serializer.serialize(&payload)?; + let (decoded, _) = serializer.deserialize::(&bytes)?; + self.decoded_legacy_value = Some(decoded.value); + Ok(()) + } + + /// # Errors + /// + /// Returns an error if no decoded legacy value has been produced. + pub fn assert_decoded_legacy_value(&self, expected: u32) -> TestResult { + let actual = self + .decoded_legacy_value + .ok_or("decoded legacy value not set")?; + if actual != expected { + return Err(format!("expected decoded legacy value {expected}, got {actual}").into()); + } + Ok(()) + } + + /// Set message and correlation ids used during deserialization. + pub fn set_deserialize_context(&mut self, message_id: u32, correlation_id: u64) { + self.context_message_id = Some(message_id); + self.context_correlation_id = Some(correlation_id); + } + + /// # Errors + /// + /// Returns an error if context values have not been set. + pub fn decode_with_context(&mut self) -> TestResult { + let message_id = self + .context_message_id + .ok_or("context message id not set")?; + let correlation_id = self + .context_correlation_id + .ok_or("context correlation id not set")?; + let envelope = Envelope::new(message_id, Some(correlation_id), vec![1, 2, 3]); + let encoded = BincodeSerializer.serialize(&envelope)?; + let serializer = ContextCapturingSerializer::new(self.captured_context.clone()); + + let context = DeserializeContext { + frame_metadata: Some(&encoded), + message_id: Some(message_id), + correlation_id: Some(correlation_id), + metadata_bytes_consumed: Some(encoded.len()), + }; + let _: (Envelope, usize) = serializer.deserialize_with_context(&encoded, &context)?; + Ok(()) + } + + /// # Errors + /// + /// Returns an error if no context was captured. + fn assert_captured_field( + &self, + field_extractor: impl FnOnce(&CapturedDeserializeContext) -> Option, + expected: T, + field_name: &str, + ) -> TestResult + where + T: PartialEq + std::fmt::Debug, + { + let captured = (*self + .captured_context + .lock() + .map_err(|_| "captured context mutex poisoned")?) + .ok_or("captured context not available")?; + let actual = field_extractor(&captured); + let expected_value = Some(expected); + if actual != expected_value { + return Err(format!( + "expected captured {field_name} {expected_value:?}, got {actual:?}" + ) + .into()); + } + Ok(()) + } + + /// # Errors + /// + /// Returns an error if no context was captured. + pub fn assert_captured_message_id(&self, expected: u32) -> TestResult { + self.assert_captured_field(|ctx| ctx.message_id, expected, "message id") + } + + /// # Errors + /// + /// Returns an error if no context was captured. + pub fn assert_captured_correlation_id(&self, expected: u64) -> TestResult { + self.assert_captured_field(|ctx| ctx.correlation_id, expected, "correlation id") + } +} diff --git a/tests/lifecycle.rs b/tests/lifecycle.rs index cf408b06..8f6d7c7a 100644 --- a/tests/lifecycle.rs +++ b/tests/lifecycle.rs @@ -164,7 +164,7 @@ async fn helpers_preserve_correlation_id_and_run_callbacks() -> TestResult<()> { let out = run_app(app, vec![frame.to_vec()], None).await?; assert!(!out.is_empty(), "expected response frames"); - let frames = decode_frames(out); + let frames = decode_frames(out)?; let [first] = frames.as_slice() else { panic!("expected a single response frame"); }; diff --git a/tests/metadata.rs b/tests/metadata.rs index 6d500784..a7f7c322 100644 --- a/tests/metadata.rs +++ b/tests/metadata.rs @@ -5,16 +5,23 @@ use std::sync::{ Arc, + Mutex, atomic::{AtomicUsize, Ordering}, }; use wireframe::{ - app::Envelope, + app::{Envelope, Packet}, frame::FrameMetadata, - serializer::{BincodeSerializer, Serializer}, + message::DeserializeContext, + serializer::{BincodeSerializer, MessageCompatibilitySerializer, Serializer}, }; use wireframe_testing::{TestResult, TestSerializer, drive_with_bincode}; +#[path = "common/context_capturing_serializer.rs"] +mod context_capturing_serializer; + +use context_capturing_serializer::{CapturedDeserializeContext, ContextCapturingSerializer}; + type TestApp = wireframe::app::WireframeApp; fn mock_wireframe_app_with_serializer( @@ -27,22 +34,51 @@ where .route(1, Arc::new(|_| Box::pin(async {}))) } +macro_rules! impl_test_serializer_boilerplate { + ($serializer:ty) => { + fn serialize( + &self, + value: &M, + ) -> Result, Box> + where + M: wireframe::message::EncodeWith<$serializer>, + { + value.encode_with(self) + } + + fn deserialize( + &self, + bytes: &[u8], + ) -> Result<(M, usize), Box> + where + M: wireframe::message::DecodeWith<$serializer>, + { + M::decode_with(self, bytes, &DeserializeContext::empty()) + } + }; +} + #[derive(Default)] -struct CountingSerializer(Arc); +struct CountingSerializer(Arc, Arc); + +impl MessageCompatibilitySerializer for CountingSerializer {} impl Serializer for CountingSerializer { - fn serialize( - &self, - value: &M, - ) -> Result, Box> { - BincodeSerializer.serialize(value) - } + impl_test_serializer_boilerplate!(CountingSerializer); - fn deserialize( + fn deserialize_with_context( &self, - _bytes: &[u8], - ) -> Result<(M, usize), Box> { - Err("unexpected deserialize call".into()) + bytes: &[u8], + context: &DeserializeContext<'_>, + ) -> Result<(M, usize), Box> + where + M: wireframe::message::DecodeWith, + { + self.1.fetch_add(1, Ordering::Relaxed); + if context.message_id.is_none() { + return Err("expected message_id in deserialize context".into()); + } + M::decode_with(self, bytes, context) } } @@ -63,7 +99,8 @@ impl FrameMetadata for CountingSerializer { )] async fn metadata_parser_invoked_before_deserialize() -> TestResult<()> { let counter = Arc::new(AtomicUsize::new(0)); - let serializer = CountingSerializer(counter.clone()); + let deserialize_calls = Arc::new(AtomicUsize::new(0)); + let serializer = CountingSerializer(counter.clone(), deserialize_calls.clone()); let app = mock_wireframe_app_with_serializer(serializer)?; let env = Envelope::new(1, Some(0), vec![42]); @@ -71,27 +108,21 @@ async fn metadata_parser_invoked_before_deserialize() -> TestResult<()> { let out = drive_with_bincode(app, env).await?; assert!(!out.is_empty(), "no frames emitted"); assert_eq!(counter.load(Ordering::Relaxed), 1, "expected 1 parse call"); + assert_eq!( + deserialize_calls.load(Ordering::Relaxed), + 1, + "expected 1 deserialize call with context" + ); Ok(()) } #[derive(Default)] -struct FallbackSerializer(Arc, Arc); +struct FallbackSerializer(Arc); -impl Serializer for FallbackSerializer { - fn serialize( - &self, - value: &M, - ) -> Result, Box> { - BincodeSerializer.serialize(value) - } +impl MessageCompatibilitySerializer for FallbackSerializer {} - fn deserialize( - &self, - bytes: &[u8], - ) -> Result<(M, usize), Box> { - self.1.fetch_add(1, Ordering::Relaxed); - BincodeSerializer.deserialize(bytes) - } +impl Serializer for FallbackSerializer { + impl_test_serializer_boilerplate!(FallbackSerializer); } impl FrameMetadata for FallbackSerializer { @@ -111,8 +142,7 @@ impl FrameMetadata for FallbackSerializer { )] async fn falls_back_to_deserialize_after_parse_error() -> TestResult<()> { let parse_calls = Arc::new(AtomicUsize::new(0)); - let deser_calls = Arc::new(AtomicUsize::new(0)); - let serializer = FallbackSerializer(parse_calls.clone(), deser_calls.clone()); + let serializer = FallbackSerializer(parse_calls.clone()); let app = mock_wireframe_app_with_serializer(serializer)?; let env = Envelope::new(1, Some(0), vec![7]); @@ -124,10 +154,34 @@ async fn falls_back_to_deserialize_after_parse_error() -> TestResult<()> { 1, "expected 1 parse call" ); + Ok(()) +} + +#[tokio::test] +#[expect( + clippy::panic_in_result_fn, + reason = "asserts provide clearer diagnostics in tests" +)] +async fn metadata_is_forwarded_to_deserialize_context() -> TestResult<()> { + let context_state = Arc::new(Mutex::new(None::)); + let serializer = ContextCapturingSerializer::new(context_state.clone()); + let app = mock_wireframe_app_with_serializer(serializer)?; + + let envelope = Envelope::new(1, Some(77), vec![1, 2, 3, 4]); + let expected_parts = envelope.clone().into_parts(); + let output = drive_with_bincode(app, envelope.clone()).await?; + assert!(!output.is_empty(), "no frames emitted"); + + let captured = (*context_state + .lock() + .unwrap_or_else(|_| panic!("mutex poisoned while locking context_state"))) + .ok_or("expected captured deserialize context")?; + + assert_eq!(captured.message_id, Some(expected_parts.id())); + assert_eq!(captured.correlation_id, expected_parts.correlation_id()); assert_eq!( - deser_calls.load(Ordering::Relaxed), - 1, - "expected 1 deserialize call" + captured.frame_metadata_len, captured.metadata_bytes_consumed, + "metadata bytes consumed should match captured frame metadata slice length" ); Ok(()) } diff --git a/tests/middleware_order.rs b/tests/middleware_order.rs index e7774edb..43b3ad78 100644 --- a/tests/middleware_order.rs +++ b/tests/middleware_order.rs @@ -74,7 +74,7 @@ async fn middleware_applied_in_reverse_order() -> TestResult<()> { let serializer = BincodeSerializer; let bytes = serializer.serialize(&env)?; let mut codec = app.length_codec(); - let frame = encode_frame(&mut codec, bytes); + let frame = encode_frame(&mut codec, bytes)?; client.write_all(&frame).await?; client.shutdown().await?; @@ -84,7 +84,7 @@ async fn middleware_applied_in_reverse_order() -> TestResult<()> { client.read_to_end(&mut out).await?; handle.await??; - let frames = decode_frames(out); + let frames = decode_frames(out)?; let [first] = frames.as_slice() else { return Err("expected a single response frame".into()); }; diff --git a/tests/push.rs b/tests/push.rs index 041cc87a..0c053bac 100644 --- a/tests/push.rs +++ b/tests/push.rs @@ -70,8 +70,8 @@ async fn disables_throttling_allows_burst_pushes() -> TestResult<()> { .unlimited() .build()?; for i in 0u8..10 { - push_expect!(handle.push_high_priority(i)); - push_expect!(handle.push_low_priority(i)); + push_expect!(handle.push_high_priority(i))?; + push_expect!(handle.push_low_priority(i))?; } let res = time::timeout(Duration::from_millis(10), handle.push_high_priority(99)).await; let push_res = res.expect("push should not block when throttling disabled"); @@ -115,11 +115,11 @@ fn builder_rejects_zero_capacity() { async fn frames_routed_to_correct_priority_queues() -> TestResult<()> { let (mut queues, handle) = small_queues()?; - push_expect!(handle.push_low_priority(1u8)); - push_expect!(handle.push_high_priority(2u8)); + push_expect!(handle.push_low_priority(1u8))?; + push_expect!(handle.push_high_priority(2u8))?; - let (prio1, frame1) = recv_expect!(queues.recv()); - let (prio2, frame2) = recv_expect!(queues.recv()); + let (prio1, frame1) = recv_expect!(queues.recv())?; + let (prio2, frame2) = recv_expect!(queues.recv())?; assert_eq!( prio1, @@ -148,7 +148,7 @@ async fn frames_routed_to_correct_priority_queues() -> TestResult<()> { async fn try_push_respects_policy() -> TestResult<()> { let (mut queues, handle) = small_queues()?; - push_expect!(handle.push_high_priority(1u8)); + push_expect!(handle.push_high_priority(1u8))?; let result = handle.try_push(2u8, PushPriority::High, PushPolicy::ReturnErrorIfFull); assert!( matches!(result, Err(PushError::QueueFull)), @@ -157,8 +157,8 @@ async fn try_push_respects_policy() -> TestResult<()> { // drain queue to allow new push let _ = queues.recv().await; - push_expect!(handle.push_high_priority(3u8)); - let (_, last) = recv_expect!(queues.recv()); + push_expect!(handle.push_high_priority(3u8))?; + let (_, last) = recv_expect!(queues.recv())?; assert_eq!(last, 3, "unexpected drained frame"); Ok(()) } @@ -198,8 +198,8 @@ async fn rate_limiter_blocks_when_exceeded(#[case] priority: PushPriority) -> Te let (mut queues, handle) = queues()?; match priority { - PushPriority::High => push_expect!(handle.push_high_priority(1u8)), - PushPriority::Low => push_expect!(handle.push_low_priority(1u8)), + PushPriority::High => push_expect!(handle.push_high_priority(1u8))?, + PushPriority::Low => push_expect!(handle.push_low_priority(1u8))?, } let mut fut = match priority { @@ -214,12 +214,12 @@ async fn rate_limiter_blocks_when_exceeded(#[case] priority: PushPriority) -> Te time::advance(Duration::from_secs(1)).await; match priority { - PushPriority::High => push_expect!(handle.push_high_priority(3u8)), - PushPriority::Low => push_expect!(handle.push_low_priority(3u8)), + PushPriority::High => push_expect!(handle.push_high_priority(3u8))?, + PushPriority::Low => push_expect!(handle.push_low_priority(3u8))?, } - let (_, first) = recv_expect!(queues.recv()); - let (_, second) = recv_expect!(queues.recv()); + let (_, first) = recv_expect!(queues.recv())?; + let (_, second) = recv_expect!(queues.recv())?; assert_eq!( (first, second), (1, 3), @@ -237,12 +237,12 @@ async fn rate_limiter_blocks_when_exceeded(#[case] priority: PushPriority) -> Te async fn rate_limiter_allows_after_wait() -> TestResult<()> { time::pause(); let (mut queues, handle) = queues()?; - push_expect!(handle.push_high_priority(1u8)); + push_expect!(handle.push_high_priority(1u8))?; time::advance(Duration::from_secs(1)).await; - push_expect!(handle.push_high_priority(2u8)); + push_expect!(handle.push_high_priority(2u8))?; - let (_, a) = recv_expect!(queues.recv()); - let (_, b) = recv_expect!(queues.recv()); + let (_, a) = recv_expect!(queues.recv())?; + let (_, b) = recv_expect!(queues.recv())?; assert_eq!((a, b), (1, 2), "unexpected frame ordering after wait"); Ok(()) } @@ -258,7 +258,7 @@ async fn rate_limiter_allows_after_wait() -> TestResult<()> { async fn rate_limiter_shared_across_priorities() -> TestResult<()> { time::pause(); let (mut queues, handle) = queues()?; - push_expect!(handle.push_high_priority(1u8)); + push_expect!(handle.push_high_priority(1u8))?; let mut fut = handle.push_low_priority(2u8).boxed(); tokio::task::yield_now().await; @@ -268,10 +268,10 @@ async fn rate_limiter_shared_across_priorities() -> TestResult<()> { ); time::advance(Duration::from_secs(1)).await; - push_expect!(handle.push_low_priority(2u8)); + push_expect!(handle.push_low_priority(2u8))?; - let (prio1, frame1) = recv_expect!(queues.recv()); - let (prio2, frame2) = recv_expect!(queues.recv()); + let (prio1, frame1) = recv_expect!(queues.recv())?; + let (prio2, frame2) = recv_expect!(queues.recv())?; assert_eq!(prio1, PushPriority::High, "first priority should be high"); assert_eq!(frame1, 1, "unexpected first frame value"); assert_eq!(prio2, PushPriority::Low, "second priority should be low"); @@ -288,12 +288,12 @@ async fn rate_limiter_shared_across_priorities() -> TestResult<()> { async fn unlimited_queues_do_not_block() -> TestResult<()> { time::pause(); let (mut queues, handle) = support::builder::().unlimited().build()?; - push_expect!(handle.push_high_priority(1u8)); + push_expect!(handle.push_high_priority(1u8))?; let res = time::timeout(Duration::from_millis(10), handle.push_low_priority(2u8)).await; assert!(res.is_ok(), "pushes should not block when unlimited"); - let (_, a) = recv_expect!(queues.recv()); - let (_, b) = recv_expect!(queues.recv()); + let (_, a) = recv_expect!(queues.recv())?; + let (_, b) = recv_expect!(queues.recv())?; assert_eq!((a, b), (1, 2), "unexpected ordering for unlimited queues"); Ok(()) } @@ -314,7 +314,7 @@ async fn rate_limiter_allows_burst_within_capacity_and_blocks_excess() -> TestRe .build()?; for i in 0u8..3 { - push_expect!(handle.push_high_priority(i)); + push_expect!(handle.push_high_priority(i))?; } let mut fut = handle.push_high_priority(99).boxed(); @@ -325,10 +325,10 @@ async fn rate_limiter_allows_burst_within_capacity_and_blocks_excess() -> TestRe ); time::advance(Duration::from_secs(1)).await; - push_expect!(handle.push_high_priority(100)); + push_expect!(handle.push_high_priority(100))?; for expected in [0u8, 1u8, 2u8, 100u8] { - let (_, frame) = recv_expect!(queues.recv()); + let (_, frame) = recv_expect!(queues.recv())?; assert_eq!( frame, expected, "frames drained in unexpected order: expected {expected}, got {frame}" diff --git a/tests/response.rs b/tests/response.rs index 4526b14b..ed4860a8 100644 --- a/tests/response.rs +++ b/tests/response.rs @@ -68,7 +68,7 @@ async fn send_response_encodes_and_frames() -> TestResult { .await .map_err(|e| format!("send_response failed: {e}"))?; - let frames = decode_frames(out); + let frames = decode_frames(out)?; assert_eq!(frames.len(), 1, "expected a single response frame"); let frame = frames.first().ok_or("expected frame missing")?; let (decoded, _) = @@ -234,7 +234,7 @@ async fn send_response_honours_buffer_capacity() -> TestResult { .await .map_err(|e| format!("send_response failed: {e}"))?; - let frames = decode_frames_with_max(out, LARGE_FRAME); + let frames = decode_frames_with_max(out, LARGE_FRAME)?; assert_eq!(frames.len(), 1, "expected a single response frame"); let frame = frames.first().ok_or("response frame missing")?; let (decoded, _) = Large::from_bytes(frame).map_err(|e| format!("deserialize failed: {e}"))?; @@ -261,10 +261,10 @@ async fn process_stream_honours_buffer_capacity() -> TestResult { .map_err(|e| format!("serialize failed: {e}"))?; let mut codec = app.length_codec(); - let frame = encode_frame(&mut codec, bytes); + let frame = encode_frame(&mut codec, bytes)?; let out = run_app(app, vec![frame], Some(10 * 1024 * 1024)).await?; - let frames = decode_frames_with_max(out, LARGE_FRAME); + let frames = decode_frames_with_max(out, LARGE_FRAME)?; assert_eq!(frames.len(), 1, "expected a single response frame"); let frame = frames.first().ok_or("response frame missing")?; let (resp_env, _) = BincodeSerializer diff --git a/tests/routes.rs b/tests/routes.rs index 5307a817..caf007ac 100644 --- a/tests/routes.rs +++ b/tests/routes.rs @@ -58,7 +58,7 @@ async fn handler_receives_message_and_echoes_response() -> TestResult<()> { let out = drive_with_bincode(app, env).await?; - let frames = decode_frames(out); + let frames = decode_frames(out)?; let [first] = frames.as_slice() else { return Err("expected a single response frame".into()); }; @@ -93,7 +93,7 @@ async fn handler_echoes_with_none_correlation_id() -> TestResult<()> { }; let out = drive_with_bincode(app, env).await?; - let frames = decode_frames(out); + let frames = decode_frames(out)?; let [first] = frames.as_slice() else { return Err("expected a single response frame".into()); }; @@ -136,7 +136,7 @@ async fn multiple_frames_processed_in_sequence() -> TestResult<()> { let out = drive_with_frames(app, encoded_frames).await?; - let frames = decode_frames(out); + let frames = decode_frames(out)?; let [first, second] = frames.as_slice() else { return Err("expected two response frames".into()); }; @@ -183,7 +183,7 @@ async fn single_frame_propagates_correlation_id(#[case] cid: Option) -> Tes codec.encode(env_bytes.into(), &mut frame_buf)?; let out = drive_with_frames(app, vec![frame_buf.to_vec()]).await?; - let frames = decode_frames(out); + let frames = decode_frames(out)?; let [first] = frames.as_slice() else { return Err("expected a single response frame".into()); }; diff --git a/tests/scenarios/mod.rs b/tests/scenarios/mod.rs index d866accb..e0959606 100644 --- a/tests/scenarios/mod.rs +++ b/tests/scenarios/mod.rs @@ -24,5 +24,6 @@ mod message_assembly_scenarios; mod multi_packet_scenarios; mod panic_scenarios; mod request_parts_scenarios; +mod serializer_boundaries_scenarios; mod stream_end_scenarios; mod unified_codec_scenarios; diff --git a/tests/scenarios/serializer_boundaries_scenarios.rs b/tests/scenarios/serializer_boundaries_scenarios.rs new file mode 100644 index 00000000..2fcc63db --- /dev/null +++ b/tests/scenarios/serializer_boundaries_scenarios.rs @@ -0,0 +1,21 @@ +//! Scenario tests for serializer boundary feature. + +use rstest_bdd_macros::scenario; + +use crate::fixtures::serializer_boundaries::*; + +#[scenario( + path = "tests/features/serializer_boundaries.feature", + name = "Legacy message round-trips through serializer-agnostic adapters" +)] +fn legacy_message_round_trip(serializer_boundaries_world: SerializerBoundariesWorld) { + let _ = serializer_boundaries_world; +} + +#[scenario( + path = "tests/features/serializer_boundaries.feature", + name = "Metadata context is forwarded to deserialization" +)] +fn metadata_context_forwarded(serializer_boundaries_world: SerializerBoundariesWorld) { + let _ = serializer_boundaries_world; +} diff --git a/tests/steps/mod.rs b/tests/steps/mod.rs index 292badfd..c52de558 100644 --- a/tests/steps/mod.rs +++ b/tests/steps/mod.rs @@ -20,6 +20,7 @@ mod message_assembly_steps; mod multi_packet_steps; mod panic_steps; mod request_parts_steps; +mod serializer_boundaries_steps; mod stream_end_steps; mod unified_codec_steps; diff --git a/tests/steps/serializer_boundaries_steps.rs b/tests/steps/serializer_boundaries_steps.rs new file mode 100644 index 00000000..c86dcb54 --- /dev/null +++ b/tests/steps/serializer_boundaries_steps.rs @@ -0,0 +1,60 @@ +//! Step definitions for serializer boundary behavioural tests. + +use rstest_bdd_macros::{given, then, when}; + +use crate::fixtures::serializer_boundaries::{SerializerBoundariesWorld, TestResult}; + +#[given("a legacy payload value {value:u32}")] +fn given_legacy_payload_value( + serializer_boundaries_world: &mut SerializerBoundariesWorld, + value: u32, +) { + serializer_boundaries_world.set_legacy_value(value); +} + +#[when("the legacy payload is encoded and decoded")] +fn when_legacy_payload_round_trip( + serializer_boundaries_world: &mut SerializerBoundariesWorld, +) -> TestResult { + serializer_boundaries_world.round_trip_legacy_payload() +} + +#[then("the decoded legacy payload value is {expected:u32}")] +fn then_decoded_legacy_value( + serializer_boundaries_world: &mut SerializerBoundariesWorld, + expected: u32, +) -> TestResult { + serializer_boundaries_world.assert_decoded_legacy_value(expected) +} + +#[given("deserialize context message id {message_id:u32} and correlation id {correlation_id:u64}")] +fn given_deserialize_context( + serializer_boundaries_world: &mut SerializerBoundariesWorld, + message_id: u32, + correlation_id: u64, +) { + serializer_boundaries_world.set_deserialize_context(message_id, correlation_id); +} + +#[when("a context-aware serializer decodes with context")] +fn when_context_aware_decode( + serializer_boundaries_world: &mut SerializerBoundariesWorld, +) -> TestResult { + serializer_boundaries_world.decode_with_context() +} + +#[then("the captured message id is {expected:u32}")] +fn then_captured_message_id( + serializer_boundaries_world: &mut SerializerBoundariesWorld, + expected: u32, +) -> TestResult { + serializer_boundaries_world.assert_captured_message_id(expected) +} + +#[then("the captured correlation id is {expected:u64}")] +fn then_captured_correlation_id( + serializer_boundaries_world: &mut SerializerBoundariesWorld, + expected: u64, +) -> TestResult { + serializer_boundaries_world.assert_captured_correlation_id(expected) +} diff --git a/wireframe_testing/README.md b/wireframe_testing/README.md index 482409e9..b73e16e9 100644 --- a/wireframe_testing/README.md +++ b/wireframe_testing/README.md @@ -24,7 +24,7 @@ async fn drives_app() -> std::io::Result<()> { let app = WireframeApp::new().expect("failed to initialise app"); let raw = drive_with_bincode(app, 42u8).await?; - let frames = decode_frames(raw); + let frames = decode_frames(raw)?; assert_eq!(frames.len(), 1); Ok(()) diff --git a/wireframe_testing/src/helpers.rs b/wireframe_testing/src/helpers.rs index 8ea26980..a3dece4c 100644 --- a/wireframe_testing/src/helpers.rs +++ b/wireframe_testing/src/helpers.rs @@ -3,697 +3,57 @@ //! These functions spin up an application on an in-memory duplex stream and //! collect the bytes written back by the app for assertions. -use std::io; - -use bincode::config; -use bytes::BytesMut; -use tokio::io::{AsyncReadExt, AsyncWriteExt, DuplexStream, duplex}; -use tokio_util::codec::{Decoder, Encoder, LengthDelimitedCodec}; use wireframe::{ - app::{Envelope, Packet, WireframeApp}, - frame::{FrameMetadata, LengthFormat}, - serializer::Serializer, + app::Envelope, + frame::FrameMetadata, + serializer::{MessageCompatibilitySerializer, Serializer}, }; +mod codec; +mod drive; +mod payloads; +mod runtime; + +#[cfg(test)] +mod tests; + +/// Serializer bounds expected by the in-memory test harness. pub trait TestSerializer: - Serializer + FrameMetadata + Send + Sync + 'static + Serializer + + MessageCompatibilitySerializer + + FrameMetadata + + Send + + Sync + + 'static { } impl TestSerializer for T where - T: Serializer + FrameMetadata + Send + Sync + 'static + T: Serializer + + MessageCompatibilitySerializer + + FrameMetadata + + Send + + Sync + + 'static { } -/// Run `server_fn` against a duplex stream, writing each `frame` to the client -/// half and returning the bytes produced by the server. -/// -/// The server function receives the server half of a `tokio::io::duplex` -/// connection. Every provided frame is written to the client side in order and -/// the collected output is returned once the server task completes. If the -/// server panics, the panic message is surfaced as an `io::Error` beginning -/// with `"server task failed"`. -/// -/// ```rust -/// use tokio::io::{AsyncWriteExt, DuplexStream}; -/// use wireframe_testing::helpers::drive_internal; -/// -/// async fn echo(mut server: DuplexStream) { let _ = server.write_all(&[1, 2]).await; } -/// -/// # async fn demo() -> std::io::Result<()> { -/// let bytes = drive_internal(echo, vec![vec![0]], 64).await?; -/// assert_eq!(bytes, [1, 2]); -/// # Ok(()) -/// # } -/// ``` -async fn drive_internal( - server_fn: F, - frames: Vec>, - capacity: usize, -) -> io::Result> -where - F: FnOnce(DuplexStream) -> Fut, - Fut: std::future::Future + Send, -{ - let (mut client, server) = duplex(capacity); - - let server_fut = async { - use futures::FutureExt as _; - let result = std::panic::AssertUnwindSafe(server_fn(server)) - .catch_unwind() - .await; - match result { - Ok(_) => Ok(()), - Err(panic) => { - let panic_msg = wireframe::panic::format_panic(&panic); - Err(io::Error::new( - io::ErrorKind::Other, - format!("server task failed: {panic_msg}"), - )) - } - } - }; - - let client_fut = async { - for frame in &frames { - client.write_all(frame).await?; - } - client.shutdown().await?; - - let mut buf = Vec::new(); - client.read_to_end(&mut buf).await?; - io::Result::Ok(buf) - }; - - let ((), buf) = tokio::try_join!(server_fut, client_fut)?; - Ok(buf) -} - -const DEFAULT_CAPACITY: usize = 4096; -const MAX_CAPACITY: usize = 1024 * 1024 * 10; // 10MB limit +pub(crate) const DEFAULT_CAPACITY: usize = 4096; +pub(crate) const MAX_CAPACITY: usize = 1024 * 1024 * 10; // 10MB limit pub(crate) const EMPTY_SERVER_CAPACITY: usize = 64; /// Shared frame cap used by helpers and tests to avoid drift. pub const TEST_MAX_FRAME: usize = DEFAULT_CAPACITY; -#[inline] -pub fn new_test_codec(max_len: usize) -> LengthDelimitedCodec { - let mut builder = LengthDelimitedCodec::builder(); - builder.max_frame_length(max_len); - builder.new_codec() -} - -/// Decode all length-prefixed `frames` using a test codec and assert no bytes remain. -/// -/// This helper constructs a [`LengthDelimitedCodec`] capped at [`TEST_MAX_FRAME`] -/// and decodes each frame in `bytes`, ensuring the buffer is fully consumed. -/// -/// ```rust -/// # use wireframe_testing::decode_frames; -/// let frames = decode_frames(vec![0, 0, 0, 1, 42]); -/// assert_eq!(frames, vec![vec![42]]); -/// ``` -#[must_use] -pub fn decode_frames(bytes: Vec) -> Vec> { - decode_frames_with_max(bytes, TEST_MAX_FRAME) -} - -/// Decode `bytes` into frames using a codec capped at `max_len`. -/// -/// Asserts that no trailing bytes remain after all frames are decoded. -#[must_use] -pub fn decode_frames_with_max(bytes: Vec, max_len: usize) -> Vec> { - let mut codec = new_test_codec(max_len); - let mut buf = BytesMut::from(&bytes[..]); - let mut frames = Vec::new(); - while let Some(frame) = codec.decode(&mut buf).expect("decode failed") { - frames.push(frame.to_vec()); - } - assert!(buf.is_empty(), "unexpected trailing bytes after decode"); - frames -} - -macro_rules! forward_default { - ( - $(#[$docs:meta])* $vis:vis fn $name:ident( - $app:ident : $app_ty:ty, - $arg:ident : $arg_ty:ty - ) -> $ret:ty - => $inner:ident($app_expr:ident, $arg_expr:expr) - ) => { - $(#[$docs])* - $vis async fn $name( - $app: $app_ty, - $arg: $arg_ty, - ) -> $ret - where - S: TestSerializer, - C: Send + 'static, - E: Packet, - { - $inner($app_expr, $arg_expr, DEFAULT_CAPACITY).await - } - }; -} - -macro_rules! forward_with_capacity { - ( - $(#[$docs:meta])* $vis:vis fn $name:ident( - $app:ident : $app_ty:ty, - $arg:ident : $arg_ty:ty, - capacity: usize - ) -> $ret:ty - => $inner:ident($app_expr:ident, $arg_expr:expr, capacity) - ) => { - $(#[$docs])* - $vis async fn $name( - $app: $app_ty, - $arg: $arg_ty, - capacity: usize, - ) -> $ret - where - S: TestSerializer, - C: Send + 'static, - E: Packet, - { - $inner($app_expr, $arg_expr, capacity).await - } - }; -} - -/// Drive `app` with a single length-prefixed `frame` and return the bytes -/// produced by the server. -/// -/// The app runs on an in-memory duplex stream so tests need not open real -/// sockets. -/// -/// # Errors -/// -/// Returns any I/O errors encountered while interacting with the in-memory -/// duplex stream. -/// -/// ```rust -/// # use wireframe_testing::drive_with_frame; -/// # use wireframe::app::WireframeApp; -/// # async fn demo() -> std::io::Result<()> { -/// let app = WireframeApp::new().expect("failed to initialize app"); -/// let bytes = drive_with_frame(app, vec![1, 2, 3]).await?; -/// # Ok(()) -/// # } -/// ``` -pub async fn drive_with_frame( - app: WireframeApp, - frame: Vec, -) -> io::Result> -where - S: TestSerializer, - C: Send + 'static, - E: Packet, -{ - drive_with_frame_with_capacity(app, frame, DEFAULT_CAPACITY).await -} - -forward_with_capacity! { - /// Drive `app` with a single frame using a duplex buffer of `capacity` bytes. - /// - /// Adjusting the buffer size helps exercise edge cases such as small channels. - /// - /// ```rust - /// # use wireframe_testing::drive_with_frame_with_capacity; - /// # use wireframe::app::WireframeApp; - /// # async fn demo() -> std::io::Result<()> { - /// let app = WireframeApp::new().expect("failed to initialize app"); - /// let bytes = drive_with_frame_with_capacity(app, vec![0], 512).await?; - /// # Ok(()) - /// # } - /// ``` - pub fn drive_with_frame_with_capacity(app: WireframeApp, frame: Vec, capacity: usize) -> io::Result> - => drive_with_frames_with_capacity(app, vec![frame], capacity) -} - -forward_default! { - /// Drive `app` with a sequence of frames using the default buffer size. - /// - /// Each frame is written to the duplex stream in order. - /// - /// ```rust - /// # use wireframe_testing::drive_with_frames; - /// # use wireframe::app::WireframeApp; - /// # async fn demo() -> std::io::Result<()> { - /// let app = WireframeApp::new().expect("failed to initialize app"); - /// let out = drive_with_frames(app, vec![vec![1], vec![2]]).await?; - /// # Ok(()) - /// # } - /// ``` - pub fn drive_with_frames(app: WireframeApp, frames: Vec>) -> io::Result> - => drive_with_frames_with_capacity(app, frames) -} - -/// Drive `app` with multiple frames using a duplex buffer of `capacity` bytes. -/// -/// This variant exposes the buffer size for fine-grained control in tests. -/// -/// ```rust -/// # use wireframe_testing::drive_with_frames_with_capacity; -/// # use wireframe::app::WireframeApp; -/// # async fn demo() -> std::io::Result<()> { -/// let app = WireframeApp::new().expect("failed to initialize app"); -/// let out = drive_with_frames_with_capacity(app, vec![vec![1], vec![2]], 1024).await?; -/// # Ok(()) -/// # } -/// ``` -pub async fn drive_with_frames_with_capacity( - app: WireframeApp, - frames: Vec>, - capacity: usize, -) -> io::Result> -where - S: TestSerializer, - C: Send + 'static, - E: Packet, -{ - drive_internal( - |server| async move { app.handle_connection(server).await }, - frames, - capacity, - ) - .await -} - -/// Encode payloads as length-delimited frames and drive `app`. -/// -/// This helper wraps each payload using the default length-delimited framing -/// format before sending it to the application. -/// -/// ```rust -/// # use wireframe_testing::drive_with_payloads; -/// # use wireframe::app::WireframeApp; -/// # async fn demo() -> std::io::Result<()> { -/// let app = WireframeApp::new().expect("failed to initialize app"); -/// let out = drive_with_payloads(app, vec![vec![1], vec![2]]).await?; -/// # let _ = out; -/// # Ok(()) -/// # } -/// ``` -pub async fn drive_with_payloads( - app: WireframeApp, - payloads: Vec>, -) -> io::Result> -where - S: TestSerializer, - C: Send + 'static, - E: Packet, -{ - drive_with_payloads_with_capacity(app, payloads, DEFAULT_CAPACITY).await -} - -/// Encode payloads as length-delimited frames and drive a mutable `app`. -/// -/// ```rust -/// # use wireframe_testing::drive_with_payloads_mut; -/// # use wireframe::app::WireframeApp; -/// # async fn demo() -> std::io::Result<()> { -/// let mut app = WireframeApp::new().expect("failed to initialize app"); -/// let out = drive_with_payloads_mut(&mut app, vec![vec![1], vec![2]]).await?; -/// # let _ = out; -/// # Ok(()) -/// # } -/// ``` -pub async fn drive_with_payloads_mut( - app: &mut WireframeApp, - payloads: Vec>, -) -> io::Result> -where - S: TestSerializer, - C: Send + 'static, - E: Packet, -{ - drive_with_payloads_with_capacity_mut(app, payloads, DEFAULT_CAPACITY).await -} - -fn encode_payloads( - payloads: Vec>, - mut codec: LengthDelimitedCodec, -) -> io::Result>> { - payloads - .into_iter() - .map(|payload| { - let header_len = LengthFormat::default().bytes(); - let mut buf = BytesMut::with_capacity(payload.len() + header_len); - codec.encode(payload.into(), &mut buf).map_err(|err| { - io::Error::new( - io::ErrorKind::InvalidData, - format!("frame encode failed: {err}"), - ) - })?; - Ok(buf.to_vec()) - }) - .collect() -} - -async fn drive_with_payloads_with_capacity( - app: WireframeApp, - payloads: Vec>, - capacity: usize, -) -> io::Result> -where - S: TestSerializer, - C: Send + 'static, - E: Packet, -{ - let codec = new_test_codec(DEFAULT_CAPACITY); - let frames = encode_payloads(payloads, codec)?; - drive_with_frames_with_capacity(app, frames, capacity).await -} - -async fn drive_with_payloads_with_capacity_mut( - app: &mut WireframeApp, - payloads: Vec>, - capacity: usize, -) -> io::Result> -where - S: TestSerializer, - C: Send + 'static, - E: Packet, -{ - let codec = new_test_codec(DEFAULT_CAPACITY); - let frames = encode_payloads(payloads, codec)?; - drive_with_frames_with_capacity_mut(app, frames, capacity).await -} - -forward_default! { - /// Feed a single frame into a mutable `app`, allowing the instance to be reused - /// across calls. - /// - /// ```rust - /// # use wireframe_testing::drive_with_frame_mut; - /// # use wireframe::app::WireframeApp; - /// # async fn demo() -> std::io::Result<()> { - /// let mut app = WireframeApp::new().expect("failed to initialize app"); - /// let bytes = drive_with_frame_mut(&mut app, vec![1]).await?; - /// # Ok(()) - /// # } - /// ``` - pub fn drive_with_frame_mut(app: &mut WireframeApp, frame: Vec) -> io::Result> - => drive_with_frame_with_capacity_mut(app, frame) -} - -forward_with_capacity! { - /// Feed a single frame into `app` using a duplex buffer of `capacity` bytes. - /// - /// ```rust - /// # use wireframe_testing::drive_with_frame_with_capacity_mut; - /// # use wireframe::app::WireframeApp; - /// # async fn demo() -> std::io::Result<()> { - /// let mut app = WireframeApp::new().expect("failed to initialize app"); - /// let bytes = drive_with_frame_with_capacity_mut(&mut app, vec![1], 256).await?; - /// # Ok(()) - /// # } - /// ``` - pub fn drive_with_frame_with_capacity_mut(app: &mut WireframeApp, frame: Vec, capacity: usize) -> io::Result> - => drive_with_frames_with_capacity_mut(app, vec![frame], capacity) -} - -forward_default! { - /// Feed multiple frames into a mutable `app`. - /// - /// ```rust - /// # use wireframe_testing::drive_with_frames_mut; - /// # use wireframe::app::WireframeApp; - /// # async fn demo() -> std::io::Result<()> { - /// let mut app = WireframeApp::new().expect("failed to initialize app"); - /// let out = drive_with_frames_mut(&mut app, vec![vec![1], vec![2]]).await?; - /// # Ok(()) - /// # } - /// ``` - pub fn drive_with_frames_mut(app: &mut WireframeApp, frames: Vec>) -> io::Result> - => drive_with_frames_with_capacity_mut(app, frames) -} - -/// Feed multiple frames into `app` with a duplex buffer of `capacity` bytes. -/// -/// ```rust -/// # use wireframe_testing::drive_with_frames_with_capacity_mut; -/// # use wireframe::app::WireframeApp; -/// # async fn demo() -> std::io::Result<()> { -/// let mut app = WireframeApp::new().expect("failed to initialize app"); -/// let out = drive_with_frames_with_capacity_mut(&mut app, vec![vec![1], vec![2]], 64).await?; -/// # Ok(()) -/// # } -/// ``` -pub async fn drive_with_frames_with_capacity_mut( - app: &mut WireframeApp, - frames: Vec>, - capacity: usize, -) -> io::Result> -where - S: TestSerializer, - C: Send + 'static, - E: Packet, -{ - drive_internal( - |server| async { app.handle_connection(server).await }, - frames, - capacity, - ) - .await -} - -/// Encode `msg` using bincode, frame it and drive `app`. -/// -/// ```rust -/// # use wireframe_testing::drive_with_bincode; -/// # use wireframe::app::WireframeApp; -/// #[derive(bincode::Encode)] -/// struct Ping(u8); -/// # async fn demo() -> std::io::Result<()> { -/// let app = WireframeApp::new().expect("failed to initialize app"); -/// let bytes = drive_with_bincode(app, Ping(1)).await?; -/// # Ok(()) -/// # } -/// ``` -pub async fn drive_with_bincode( - app: WireframeApp, - msg: M, -) -> io::Result> -where - M: bincode::Encode, - S: TestSerializer, - C: Send + 'static, - E: Packet, -{ - let bytes = bincode::encode_to_vec(msg, config::standard()).map_err(|e| { - io::Error::new( - io::ErrorKind::InvalidData, - format!("bincode encode failed: {e}"), - ) - })?; - let mut codec = new_test_codec(DEFAULT_CAPACITY); - let mut framed = BytesMut::with_capacity(bytes.len() + 4); - codec.encode(bytes.into(), &mut framed)?; - drive_with_frame(app, framed.to_vec()).await -} - -/// Run `app` with input `frames` using an optional duplex buffer `capacity`. -/// -/// When `capacity` is `None`, a buffer of [`DEFAULT_CAPACITY`] bytes is used. -/// Frames are written to the client side in order and the bytes emitted by the -/// server are collected for inspection. -/// -/// # Errors -/// -/// Returns an error if `capacity` is zero or exceeds [`MAX_CAPACITY`]. Any -/// panic in the application task or I/O error on the duplex stream is also -/// surfaced as an error. -/// -/// ```rust -/// # use wireframe_testing::run_app; -/// # use wireframe::app::WireframeApp; -/// # async fn demo() -> std::io::Result<()> { -/// let app = WireframeApp::new().expect("failed to initialize app"); -/// let out = run_app(app, vec![vec![1]], None).await?; -/// # Ok(()) -/// # } -/// ``` - -/// Encode bytes with a length-delimited `codec`, preallocating the prefix. -/// -/// Panics if encoding fails. -#[must_use] -pub fn encode_frame(codec: &mut LengthDelimitedCodec, bytes: Vec) -> Vec { - let header_len = LengthFormat::default().bytes(); - let mut buf = BytesMut::with_capacity(bytes.len() + header_len); - codec.encode(bytes.into(), &mut buf).expect("encode failed"); - buf.to_vec() -} - -pub async fn run_app( - app: WireframeApp, - frames: Vec>, - capacity: Option, -) -> io::Result> -where - S: TestSerializer, - C: Send + 'static, - E: Packet, -{ - let capacity = capacity.unwrap_or(DEFAULT_CAPACITY); - if capacity == 0 { - return Err(io::Error::new( - io::ErrorKind::InvalidInput, - "capacity must be greater than zero", - )); - } - if capacity > MAX_CAPACITY { - return Err(io::Error::new( - io::ErrorKind::InvalidInput, - format!("capacity must not exceed {MAX_CAPACITY} bytes"), - )); - } - - let (mut client, server) = duplex(capacity); - let server_task = tokio::spawn(async move { app.handle_connection(server).await }); - - for frame in &frames { - client.write_all(frame).await?; - } - client.shutdown().await?; - - let mut buf = Vec::new(); - client.read_to_end(&mut buf).await?; - - if let Err(e) = server_task.await { - return Err(io::Error::new( - io::ErrorKind::Other, - format!("server task failed: {e}"), - )); - } - - Ok(buf) -} - -#[cfg(test)] -mod tests { - use std::sync::Arc; - - use futures::future::BoxFuture; - use wireframe::{ - Serializer, - app::{Envelope, WireframeApp}, - serializer::BincodeSerializer, - }; - - use super::*; - - #[tokio::test] - async fn run_app_rejects_zero_capacity() { - let app: WireframeApp = - WireframeApp::new().expect("failed to create app"); - let err = run_app(app, vec![], Some(0)) - .await - .expect_err("capacity of zero should error"); - assert_eq!(err.kind(), std::io::ErrorKind::InvalidInput); - } - - #[tokio::test] - async fn run_app_rejects_excess_capacity() { - let app: WireframeApp = - WireframeApp::new().expect("failed to create app"); - let err = run_app(app, vec![], Some(MAX_CAPACITY + 1)) - .await - .expect_err("capacity beyond max should error"); - assert_eq!(err.kind(), std::io::ErrorKind::InvalidInput); - } - - #[tokio::test] - async fn drive_with_payloads_wraps_frames() -> io::Result<()> { - let app: WireframeApp = - WireframeApp::new().expect("failed to create app"); - let app = app - .route( - 1, - Arc::new(|_: &Envelope| -> BoxFuture<'static, ()> { Box::pin(async {}) }), - ) - .expect("route registration should succeed"); - let serializer = BincodeSerializer::default(); - let payload = vec![1_u8, 2, 3]; - let env = Envelope::new(1, Some(7), payload.clone()); - let encoded = serializer - .serialize(&env) - .expect("failed to serialize envelope"); - - let out = drive_with_payloads(app, vec![encoded]).await?; - let frames = decode_frames(out); - let [first] = frames.as_slice() else { - panic!("expected a single response frame"); - }; - let (decoded, _) = serializer - .deserialize::(first) - .expect("failed to deserialise envelope"); - assert_eq!( - decoded.payload_bytes(), - payload.as_slice(), - "payload mismatch" - ); - Ok(()) - } -} - -/// Run `app` against an empty duplex stream. -/// -/// This helper drives the connection lifecycle without sending any frames, -/// ensuring setup and teardown callbacks execute. -/// -/// # Panics -/// -/// Panics if `handle_connection` fails. -/// -/// ```rust -/// # use wireframe_testing::run_with_duplex_server; -/// # use wireframe::app::WireframeApp; -/// # async fn demo() { -/// let app = WireframeApp::new() -/// .expect("failed to initialize app"); -/// run_with_duplex_server(app).await; -/// } -/// ``` -pub async fn run_with_duplex_server(app: WireframeApp) -where - S: TestSerializer, - C: Send + 'static, - E: Packet, -{ - let (_, server) = duplex(EMPTY_SERVER_CAPACITY); // discard client half - app.handle_connection(server).await; -} - -/// Await the provided future and panic with context on failure. -/// -/// In debug builds, the generated message includes the call site for easier -/// troubleshooting. -#[macro_export] -macro_rules! push_expect { - ($fut:expr) => {{ - $fut.await - .expect(concat!("push failed at ", file!(), ":", line!())) - }}; - ($fut:expr, $msg:expr) => {{ - let m = ::std::format!("{msg} at {}:{}", file!(), line!(), msg = $msg); - $fut.await.expect(&m) - }}; -} - -/// Await the provided future and panic with context on failure. -/// -/// In debug builds, the generated message includes the call site for easier -/// troubleshooting. -#[macro_export] -macro_rules! recv_expect { - ($fut:expr) => {{ - $fut.await - .expect(concat!("recv failed at ", file!(), ":", line!())) - }}; - ($fut:expr, $msg:expr) => {{ - let m = ::std::format!("{msg} at {}:{}", file!(), line!(), msg = $msg); - $fut.await.expect(&m) - }}; -} +pub use codec::{decode_frames, decode_frames_with_max, encode_frame, new_test_codec}; +pub use drive::{ + drive_with_frame, + drive_with_frame_mut, + drive_with_frame_with_capacity, + drive_with_frame_with_capacity_mut, + drive_with_frames, + drive_with_frames_mut, + drive_with_frames_with_capacity, + drive_with_frames_with_capacity_mut, +}; +pub use payloads::{drive_with_bincode, drive_with_payloads, drive_with_payloads_mut}; +pub use runtime::{run_app, run_with_duplex_server}; diff --git a/wireframe_testing/src/helpers/codec.rs b/wireframe_testing/src/helpers/codec.rs new file mode 100644 index 00000000..74800830 --- /dev/null +++ b/wireframe_testing/src/helpers/codec.rs @@ -0,0 +1,77 @@ +//! Framing codec helpers used by test harness utilities. + +use std::io; + +use bytes::BytesMut; +use tokio_util::codec::{Decoder, Encoder, LengthDelimitedCodec}; +use wireframe::frame::LengthFormat; + +use super::TEST_MAX_FRAME; + +/// Build a [`LengthDelimitedCodec`] configured with `max_len` as the maximum +/// accepted frame length. +/// +/// The codec uses the default [`LengthFormat`] framing and enforces +/// `max_len` during decode. +#[inline] +pub fn new_test_codec(max_len: usize) -> LengthDelimitedCodec { + let mut builder = LengthDelimitedCodec::builder(); + builder.max_frame_length(max_len); + builder.new_codec() +} + +/// Decode all length-prefixed `frames` using a test codec. +/// +/// This helper constructs a [`LengthDelimitedCodec`] capped at [`TEST_MAX_FRAME`] +/// and decodes each frame in `bytes`, returning an error when decode fails or +/// trailing bytes remain. +/// +/// ```rust +/// # use wireframe_testing::decode_frames; +/// let frames = decode_frames(vec![0, 0, 0, 1, 42])?; +/// assert_eq!(frames, vec![vec![42]]); +/// # Ok::<(), std::io::Error>(()) +/// ``` +pub fn decode_frames(bytes: Vec) -> io::Result>> { + decode_frames_with_max(bytes, TEST_MAX_FRAME) +} + +/// Decode `bytes` into frames using a codec capped at `max_len`. +/// +/// Returns an error if decode fails or trailing bytes remain after frame +/// extraction. +pub fn decode_frames_with_max(bytes: Vec, max_len: usize) -> io::Result>> { + let mut codec = new_test_codec(max_len); + let mut buf = BytesMut::from(&bytes[..]); + let mut frames = Vec::new(); + while let Some(frame) = codec.decode(&mut buf).map_err(|error| { + io::Error::new( + io::ErrorKind::InvalidData, + format!("decode failed: {error}"), + ) + })? { + frames.push(frame.to_vec()); + } + if !buf.is_empty() { + return Err(io::Error::new( + io::ErrorKind::InvalidData, + format!("unexpected trailing bytes after decode: {}", buf.len()), + )); + } + Ok(frames) +} + +/// Encode bytes with a length-delimited `codec`, preallocating the prefix. +/// +/// Returns an error if framing fails. +pub fn encode_frame(codec: &mut LengthDelimitedCodec, bytes: Vec) -> io::Result> { + let header_len = LengthFormat::default().bytes(); + let mut buf = BytesMut::with_capacity(bytes.len() + header_len); + codec.encode(bytes.into(), &mut buf).map_err(|error| { + io::Error::new( + io::ErrorKind::InvalidData, + format!("encode failed: {error}"), + ) + })?; + Ok(buf.to_vec()) +} diff --git a/wireframe_testing/src/helpers/drive.rs b/wireframe_testing/src/helpers/drive.rs new file mode 100644 index 00000000..bd5c9e3f --- /dev/null +++ b/wireframe_testing/src/helpers/drive.rs @@ -0,0 +1,297 @@ +//! Frame-oriented in-memory driving helpers. + +use std::io; + +use tokio::io::{AsyncReadExt, AsyncWriteExt, DuplexStream, duplex}; +use wireframe::app::{Packet, WireframeApp}; + +use super::{DEFAULT_CAPACITY, TestSerializer}; + +/// Run `server_fn` against a duplex stream, writing each `frame` to the client +/// half and returning the bytes produced by the server. +/// +/// The server function receives the server half of a `tokio::io::duplex` +/// connection. Every provided frame is written to the client side in order and +/// the collected output is returned once the server task completes. If the +/// server panics, the panic message is surfaced as an `io::Error` beginning +/// with `"server task failed"`. +/// +/// ```rust +/// use tokio::io::{AsyncWriteExt, DuplexStream}; +/// use wireframe_testing::helpers::drive::drive_internal; +/// +/// async fn echo(mut server: DuplexStream) { let _ = server.write_all(&[1, 2]).await; } +/// +/// # async fn demo() -> std::io::Result<()> { +/// let bytes = drive_internal(echo, vec![vec![0]], 64).await?; +/// assert_eq!(bytes, [1, 2]); +/// # Ok(()) +/// # } +/// ``` +pub(super) async fn drive_internal( + server_fn: F, + frames: Vec>, + capacity: usize, +) -> io::Result> +where + F: FnOnce(DuplexStream) -> Fut, + Fut: std::future::Future + Send, +{ + let (mut client, server) = duplex(capacity); + + let server_fut = async { + use futures::FutureExt as _; + let result = std::panic::AssertUnwindSafe(server_fn(server)) + .catch_unwind() + .await; + match result { + Ok(_) => Ok(()), + Err(panic) => { + let panic_msg = wireframe::panic::format_panic(&panic); + Err(io::Error::new( + io::ErrorKind::Other, + format!("server task failed: {panic_msg}"), + )) + } + } + }; + + let client_fut = async { + for frame in &frames { + client.write_all(frame).await?; + } + client.shutdown().await?; + + let mut buf = Vec::new(); + client.read_to_end(&mut buf).await?; + io::Result::Ok(buf) + }; + + let ((), buf) = tokio::try_join!(server_fut, client_fut)?; + Ok(buf) +} + +macro_rules! forward_default { + ( + $(#[$docs:meta])* $vis:vis fn $name:ident( + $app:ident : $app_ty:ty, + $arg:ident : $arg_ty:ty + ) -> $ret:ty + => $inner:ident($app_expr:ident, $arg_expr:expr) + ) => { + $(#[$docs])* + $vis async fn $name( + $app: $app_ty, + $arg: $arg_ty, + ) -> $ret + where + S: TestSerializer, + C: Send + 'static, + E: Packet, + { + $inner($app_expr, $arg_expr, DEFAULT_CAPACITY).await + } + }; +} + +macro_rules! forward_with_capacity { + ( + $(#[$docs:meta])* $vis:vis fn $name:ident( + $app:ident : $app_ty:ty, + $arg:ident : $arg_ty:ty, + capacity: usize + ) -> $ret:ty + => $inner:ident($app_expr:ident, $arg_expr:expr, capacity) + ) => { + $(#[$docs])* + $vis async fn $name( + $app: $app_ty, + $arg: $arg_ty, + capacity: usize, + ) -> $ret + where + S: TestSerializer, + C: Send + 'static, + E: Packet, + { + $inner($app_expr, $arg_expr, capacity).await + } + }; +} + +/// Drive `app` with a single length-prefixed `frame` and return the bytes +/// produced by the server. +/// +/// The app runs on an in-memory duplex stream so tests need not open real +/// sockets. +/// +/// # Errors +/// +/// Returns any I/O errors encountered while interacting with the in-memory +/// duplex stream. +/// +/// ```rust +/// # use wireframe_testing::drive_with_frame; +/// # use wireframe::app::WireframeApp; +/// # async fn demo() -> std::io::Result<()> { +/// let app = WireframeApp::new().expect("failed to initialize app"); +/// let bytes = drive_with_frame(app, vec![1, 2, 3]).await?; +/// # Ok(()) +/// # } +/// ``` +pub async fn drive_with_frame( + app: WireframeApp, + frame: Vec, +) -> io::Result> +where + S: TestSerializer, + C: Send + 'static, + E: Packet, +{ + drive_with_frame_with_capacity(app, frame, DEFAULT_CAPACITY).await +} + +forward_with_capacity! { + /// Drive `app` with a single frame using a duplex buffer of `capacity` bytes. + /// + /// Adjusting the buffer size helps exercise edge cases such as small channels. + /// + /// ```rust + /// # use wireframe_testing::drive_with_frame_with_capacity; + /// # use wireframe::app::WireframeApp; + /// # async fn demo() -> std::io::Result<()> { + /// let app = WireframeApp::new().expect("failed to initialize app"); + /// let bytes = drive_with_frame_with_capacity(app, vec![0], 512).await?; + /// # Ok(()) + /// # } + /// ``` + pub fn drive_with_frame_with_capacity(app: WireframeApp, frame: Vec, capacity: usize) -> io::Result> + => drive_with_frames_with_capacity(app, vec![frame], capacity) +} + +forward_default! { + /// Drive `app` with a sequence of frames using the default buffer size. + /// + /// Each frame is written to the duplex stream in order. + /// + /// ```rust + /// # use wireframe_testing::drive_with_frames; + /// # use wireframe::app::WireframeApp; + /// # async fn demo() -> std::io::Result<()> { + /// let app = WireframeApp::new().expect("failed to initialize app"); + /// let out = drive_with_frames(app, vec![vec![1], vec![2]]).await?; + /// # Ok(()) + /// # } + /// ``` + pub fn drive_with_frames(app: WireframeApp, frames: Vec>) -> io::Result> + => drive_with_frames_with_capacity(app, frames) +} + +/// Drive `app` with multiple frames using a duplex buffer of `capacity` bytes. +/// +/// This variant exposes the buffer size for fine-grained control in tests. +/// +/// ```rust +/// # use wireframe_testing::drive_with_frames_with_capacity; +/// # use wireframe::app::WireframeApp; +/// # async fn demo() -> std::io::Result<()> { +/// let app = WireframeApp::new().expect("failed to initialize app"); +/// let out = drive_with_frames_with_capacity(app, vec![vec![1], vec![2]], 1024).await?; +/// # Ok(()) +/// # } +/// ``` +pub async fn drive_with_frames_with_capacity( + app: WireframeApp, + frames: Vec>, + capacity: usize, +) -> io::Result> +where + S: TestSerializer, + C: Send + 'static, + E: Packet, +{ + drive_internal( + |server| async move { app.handle_connection(server).await }, + frames, + capacity, + ) + .await +} + +forward_default! { + /// Feed a single frame into a mutable `app`, allowing the instance to be reused + /// across calls. + /// + /// ```rust + /// # use wireframe_testing::drive_with_frame_mut; + /// # use wireframe::app::WireframeApp; + /// # async fn demo() -> std::io::Result<()> { + /// let mut app = WireframeApp::new().expect("failed to initialize app"); + /// let bytes = drive_with_frame_mut(&mut app, vec![1]).await?; + /// # Ok(()) + /// # } + /// ``` + pub fn drive_with_frame_mut(app: &mut WireframeApp, frame: Vec) -> io::Result> + => drive_with_frame_with_capacity_mut(app, frame) +} + +forward_with_capacity! { + /// Feed a single frame into `app` using a duplex buffer of `capacity` bytes. + /// + /// ```rust + /// # use wireframe_testing::drive_with_frame_with_capacity_mut; + /// # use wireframe::app::WireframeApp; + /// # async fn demo() -> std::io::Result<()> { + /// let mut app = WireframeApp::new().expect("failed to initialize app"); + /// let bytes = drive_with_frame_with_capacity_mut(&mut app, vec![1], 256).await?; + /// # Ok(()) + /// # } + /// ``` + pub fn drive_with_frame_with_capacity_mut(app: &mut WireframeApp, frame: Vec, capacity: usize) -> io::Result> + => drive_with_frames_with_capacity_mut(app, vec![frame], capacity) +} + +forward_default! { + /// Feed multiple frames into a mutable `app`. + /// + /// ```rust + /// # use wireframe_testing::drive_with_frames_mut; + /// # use wireframe::app::WireframeApp; + /// # async fn demo() -> std::io::Result<()> { + /// let mut app = WireframeApp::new().expect("failed to initialize app"); + /// let out = drive_with_frames_mut(&mut app, vec![vec![1], vec![2]]).await?; + /// # Ok(()) + /// # } + /// ``` + pub fn drive_with_frames_mut(app: &mut WireframeApp, frames: Vec>) -> io::Result> + => drive_with_frames_with_capacity_mut(app, frames) +} + +/// Feed multiple frames into `app` with a duplex buffer of `capacity` bytes. +/// +/// ```rust +/// # use wireframe_testing::drive_with_frames_with_capacity_mut; +/// # use wireframe::app::WireframeApp; +/// # async fn demo() -> std::io::Result<()> { +/// let mut app = WireframeApp::new().expect("failed to initialize app"); +/// let out = drive_with_frames_with_capacity_mut(&mut app, vec![vec![1], vec![2]], 64).await?; +/// # Ok(()) +/// # } +/// ``` +pub async fn drive_with_frames_with_capacity_mut( + app: &mut WireframeApp, + frames: Vec>, + capacity: usize, +) -> io::Result> +where + S: TestSerializer, + C: Send + 'static, + E: Packet, +{ + drive_internal( + |server| async { app.handle_connection(server).await }, + frames, + capacity, + ) + .await +} diff --git a/wireframe_testing/src/helpers/payloads.rs b/wireframe_testing/src/helpers/payloads.rs new file mode 100644 index 00000000..bc314e44 --- /dev/null +++ b/wireframe_testing/src/helpers/payloads.rs @@ -0,0 +1,152 @@ +//! Payload-oriented in-memory driving helpers. + +use std::io; + +use bincode::config; +use bytes::BytesMut; +use tokio_util::codec::{Encoder, LengthDelimitedCodec}; +use wireframe::{ + app::{Packet, WireframeApp}, + frame::LengthFormat, +}; + +use super::{DEFAULT_CAPACITY, TestSerializer, drive, new_test_codec}; + +/// Encode payloads as length-delimited frames and drive `app`. +/// +/// This helper wraps each payload using the default length-delimited framing +/// format before sending it to the application. +/// +/// ```rust +/// # use wireframe_testing::drive_with_payloads; +/// # use wireframe::app::WireframeApp; +/// # async fn demo() -> std::io::Result<()> { +/// let app = WireframeApp::new().expect("failed to initialize app"); +/// let out = drive_with_payloads(app, vec![vec![1], vec![2]]).await?; +/// # let _ = out; +/// # Ok(()) +/// # } +/// ``` +pub async fn drive_with_payloads( + app: WireframeApp, + payloads: Vec>, +) -> io::Result> +where + S: TestSerializer, + C: Send + 'static, + E: Packet, +{ + drive_with_payloads_with_capacity(app, payloads, DEFAULT_CAPACITY).await +} + +/// Encode payloads as length-delimited frames and drive a mutable `app`. +/// +/// ```rust +/// # use wireframe_testing::drive_with_payloads_mut; +/// # use wireframe::app::WireframeApp; +/// # async fn demo() -> std::io::Result<()> { +/// let mut app = WireframeApp::new().expect("failed to initialize app"); +/// let out = drive_with_payloads_mut(&mut app, vec![vec![1], vec![2]]).await?; +/// # let _ = out; +/// # Ok(()) +/// # } +/// ``` +pub async fn drive_with_payloads_mut( + app: &mut WireframeApp, + payloads: Vec>, +) -> io::Result> +where + S: TestSerializer, + C: Send + 'static, + E: Packet, +{ + drive_with_payloads_with_capacity_mut(app, payloads, DEFAULT_CAPACITY).await +} + +fn encode_payloads( + payloads: Vec>, + mut codec: LengthDelimitedCodec, +) -> io::Result>> { + payloads + .into_iter() + .map(|payload| { + let header_len = LengthFormat::default().bytes(); + let mut buf = BytesMut::with_capacity(payload.len() + header_len); + codec.encode(payload.into(), &mut buf).map_err(|err| { + io::Error::new( + io::ErrorKind::InvalidData, + format!("frame encode failed: {err}"), + ) + })?; + Ok(buf.to_vec()) + }) + .collect() +} + +async fn drive_with_payloads_with_capacity( + app: WireframeApp, + payloads: Vec>, + capacity: usize, +) -> io::Result> +where + S: TestSerializer, + C: Send + 'static, + E: Packet, +{ + let codec = new_test_codec(DEFAULT_CAPACITY); + let frames = encode_payloads(payloads, codec)?; + drive::drive_with_frames_with_capacity(app, frames, capacity).await +} + +async fn drive_with_payloads_with_capacity_mut( + app: &mut WireframeApp, + payloads: Vec>, + capacity: usize, +) -> io::Result> +where + S: TestSerializer, + C: Send + 'static, + E: Packet, +{ + let codec = new_test_codec(DEFAULT_CAPACITY); + let frames = encode_payloads(payloads, codec)?; + drive::drive_with_frames_with_capacity_mut(app, frames, capacity).await +} + +/// Encode `msg` using bincode, frame it and drive `app`. +/// +/// ```rust +/// # use wireframe_testing::drive_with_bincode; +/// # use wireframe::app::WireframeApp; +/// #[derive(bincode::Encode)] +/// struct Ping(u8); +/// # async fn demo() -> std::io::Result<()> { +/// let app = WireframeApp::new().expect("failed to initialize app"); +/// let bytes = drive_with_bincode(app, Ping(1)).await?; +/// # Ok(()) +/// # } +/// ``` +pub async fn drive_with_bincode( + app: WireframeApp, + msg: M, +) -> io::Result> +where + M: bincode::Encode, + S: TestSerializer, + C: Send + 'static, + E: Packet, +{ + let bytes = bincode::encode_to_vec(msg, config::standard()).map_err(|error| { + io::Error::new( + io::ErrorKind::InvalidData, + format!("bincode encode failed: {error}"), + ) + })?; + let mut codec = new_test_codec(DEFAULT_CAPACITY); + let mut prefix_probe = BytesMut::new(); + codec.encode(Vec::::new().into(), &mut prefix_probe)?; + let prefix_len = prefix_probe.len(); + let mut framed = BytesMut::with_capacity(bytes.len() + prefix_len); + codec.encode(bytes.into(), &mut framed)?; + drive::drive_with_frame(app, framed.to_vec()).await +} diff --git a/wireframe_testing/src/helpers/runtime.rs b/wireframe_testing/src/helpers/runtime.rs new file mode 100644 index 00000000..6c53d279 --- /dev/null +++ b/wireframe_testing/src/helpers/runtime.rs @@ -0,0 +1,89 @@ +//! Runtime-level helpers for running apps against in-memory streams. + +use std::io; + +use tokio::io::duplex; +use wireframe::app::{Packet, WireframeApp}; + +use super::{EMPTY_SERVER_CAPACITY, MAX_CAPACITY, TestSerializer, drive::drive_internal}; + +/// Run `app` with input `frames` using an optional duplex buffer `capacity`. +/// +/// When `capacity` is `None`, a buffer of `DEFAULT_CAPACITY` bytes is used. +/// Frames are written to the client side in order and the bytes emitted by the +/// server are collected for inspection. +/// +/// # Errors +/// +/// Returns an error if `capacity` is zero or exceeds `MAX_CAPACITY`. Any panic +/// in the application task or I/O error on the duplex stream is also surfaced +/// as an error. +/// +/// ```rust +/// # use wireframe_testing::run_app; +/// # use wireframe::app::WireframeApp; +/// # async fn demo() -> std::io::Result<()> { +/// let app = WireframeApp::new().expect("failed to initialize app"); +/// let out = run_app(app, vec![vec![1]], None).await?; +/// # let _ = out; +/// # Ok(()) +/// # } +/// ``` +pub async fn run_app( + app: WireframeApp, + frames: Vec>, + capacity: Option, +) -> io::Result> +where + S: TestSerializer, + C: Send + 'static, + E: Packet, +{ + let capacity = capacity.unwrap_or(super::DEFAULT_CAPACITY); + if capacity == 0 { + return Err(io::Error::new( + io::ErrorKind::InvalidInput, + "capacity must be greater than zero", + )); + } + if capacity > MAX_CAPACITY { + return Err(io::Error::new( + io::ErrorKind::InvalidInput, + format!("capacity must not exceed {MAX_CAPACITY} bytes"), + )); + } + + drive_internal( + |server| async move { app.handle_connection(server).await }, + frames, + capacity, + ) + .await +} + +/// Run `app` against an empty duplex stream. +/// +/// This helper drives the connection lifecycle without sending any frames, +/// ensuring setup and teardown callbacks execute. +/// +/// # Panics +/// +/// Panics if `handle_connection` fails. +/// +/// ```rust +/// # use wireframe_testing::run_with_duplex_server; +/// # use wireframe::app::WireframeApp; +/// # async fn demo() { +/// let app = WireframeApp::new().expect("failed to initialize app"); +/// run_with_duplex_server(app).await; +/// # } +/// ``` +pub async fn run_with_duplex_server(app: WireframeApp) +where + S: TestSerializer, + C: Send + 'static, + E: Packet, +{ + let (_, server) = duplex(EMPTY_SERVER_CAPACITY); // discard client half + app.handle_connection(server).await; +} diff --git a/wireframe_testing/src/helpers/tests/helper_tests.rs b/wireframe_testing/src/helpers/tests/helper_tests.rs new file mode 100644 index 00000000..9bbce5cc --- /dev/null +++ b/wireframe_testing/src/helpers/tests/helper_tests.rs @@ -0,0 +1,91 @@ +//! Verifies helper utilities such as `run_app`, `drive_with_payloads`, +//! `decode_frames`, and `MAX_CAPACITY` handling. + +use std::{io, sync::Arc}; + +use futures::future::BoxFuture; +use wireframe::{ + Serializer, + app::{Envelope, WireframeApp}, + serializer::BincodeSerializer, +}; + +use crate::helpers::{MAX_CAPACITY, decode_frames, drive_with_payloads, run_app}; + +#[tokio::test] +async fn run_app_rejects_zero_capacity() { + let app: WireframeApp = + WireframeApp::new().expect("failed to create app"); + let err = run_app(app, vec![], Some(0)) + .await + .expect_err("capacity of zero should error"); + assert_eq!(err.kind(), std::io::ErrorKind::InvalidInput); +} + +#[tokio::test] +async fn run_app_rejects_excess_capacity() { + let app: WireframeApp = + WireframeApp::new().expect("failed to create app"); + let err = run_app(app, vec![], Some(MAX_CAPACITY + 1)) + .await + .expect_err("capacity beyond max should error"); + assert_eq!(err.kind(), std::io::ErrorKind::InvalidInput); +} + +#[tokio::test] +async fn drive_with_payloads_wraps_frames() -> io::Result<()> { + let app: WireframeApp = + WireframeApp::new().map_err(|error| { + io::Error::new( + io::ErrorKind::InvalidData, + format!("failed to create app: {error}"), + ) + })?; + let app = app + .route( + 1, + Arc::new(|_: &Envelope| -> BoxFuture<'static, ()> { Box::pin(async {}) }), + ) + .map_err(|error| { + io::Error::new( + io::ErrorKind::InvalidData, + format!("route registration should succeed: {error}"), + ) + })?; + let serializer = BincodeSerializer; + let payload = vec![1_u8, 2, 3]; + let env = Envelope::new(1, Some(7), payload.clone()); + let encoded = serializer.serialize(&env).map_err(|error| { + io::Error::new( + io::ErrorKind::InvalidData, + format!("failed to serialize envelope: {error}"), + ) + })?; + + let out = drive_with_payloads(app, vec![encoded]).await?; + let frames = decode_frames(out)?; + if frames.len() != 1 { + return Err(io::Error::new( + io::ErrorKind::InvalidData, + format!("expected a single response frame, got {}", frames.len()), + )); + } + let first = frames.first().ok_or_else(|| { + io::Error::new( + io::ErrorKind::InvalidData, + "expected a single response frame", + ) + })?; + let (decoded, _) = serializer.deserialize::(first).map_err(|error| { + io::Error::new( + io::ErrorKind::InvalidData, + format!("failed to deserialise envelope: {error}"), + ) + })?; + assert_eq!( + decoded.payload_bytes(), + payload.as_slice(), + "payload mismatch" + ); + Ok(()) +} diff --git a/wireframe_testing/src/helpers/tests/mod.rs b/wireframe_testing/src/helpers/tests/mod.rs new file mode 100644 index 00000000..3ee0e868 --- /dev/null +++ b/wireframe_testing/src/helpers/tests/mod.rs @@ -0,0 +1,3 @@ +//! Tests for the helper submodules that drive apps and decode framed payloads. + +mod helper_tests; diff --git a/wireframe_testing/src/lib.rs b/wireframe_testing/src/lib.rs index 3998245c..dc472b13 100644 --- a/wireframe_testing/src/lib.rs +++ b/wireframe_testing/src/lib.rs @@ -22,6 +22,7 @@ pub mod echo_server; pub mod helpers; pub mod integration_helpers; pub mod logging; +pub mod macros; pub mod multi_packet; pub use echo_server::{ServerMode, process_frame}; diff --git a/wireframe_testing/src/macros.rs b/wireframe_testing/src/macros.rs new file mode 100644 index 00000000..3c6f3dfc --- /dev/null +++ b/wireframe_testing/src/macros.rs @@ -0,0 +1,51 @@ +//! Assertion macros shared by test helpers and integration tests. + +/// Await a push future and return a contextualized error on failure. +#[macro_export] +macro_rules! push_expect { + ($fut:expr) => {{ + $fut.await.map_err(|err| { + ::std::io::Error::other(::std::format!( + "push failed at {}:{}: {err}", + file!(), + line!() + )) + }) + }}; + ($fut:expr, $msg:expr) => {{ + $fut.await.map_err(|err| { + ::std::io::Error::other(::std::format!( + "{msg} at {}:{}: {err}", + file!(), + line!(), + msg = $msg + )) + }) + }}; +} + +/// Await a receive future and return a contextualized error on failure. +#[macro_export] +macro_rules! recv_expect { + ($fut:expr) => {{ + $fut.await.ok_or_else(|| { + ::std::io::Error::other(::std::format!( + "recv failed at {}:{}: channel closed", + file!(), + line!() + )) + }) + }}; + ($fut:expr, $msg:expr) => {{ + $fut.await.ok_or_else(|| { + ::std::io::Error::other(::std::format!( + "{msg} at {}:{}: channel closed", + file!(), + line!(), + msg = $msg + )) + }) + }}; +} + +pub use crate::{push_expect, recv_expect};