Skip to content

Add StreamingResponseExt trait and TypedResponseStream adapter#508

Merged
leynos merged 15 commits intomainfrom
add-streaming-response-helpers-jxq7p8
Mar 27, 2026
Merged

Add StreamingResponseExt trait and TypedResponseStream adapter#508
leynos merged 15 commits intomainfrom
add-streaming-response-helpers-jxq7p8

Conversation

@leynos
Copy link
Copy Markdown
Owner

@leynos leynos commented Mar 21, 2026

Summary

  • Adds ergonomic streaming helpers for client streaming responses: a public StreamingResponseExt trait and a TypedResponseStream adapter, enabling mapping protocol frames into domain items while skipping control frames. Public API surface is exported from the client module. Includes unit and behavioural tests, plus documentation and roadmap updates.

Changes

  • New module added: src/client/streaming_helpers.rs implementing the StreamingResponseExt trait and the TypedResponseStream adapter.
  • Public API exports updated: src/client/mod.rs now re-exports StreamingResponseExt and TypedResponseStream from the new module.
  • Unit tests added: src/client/tests/streaming_helpers.rs covering typed-item order, control-frame skipping, mapper errors, correlation/disconnect propagation, and empty streams.
  • Behavioural tests updated: extended client streaming BDD suite to exercise typed consumption via the new helper (scenarios around typed items and control-frame skipping).
  • Test fixtures updated: tests/fixtures/client_streaming.rs and related modules adjusted to support the new helper usage and typed-stream testing.
  • Documentation & roadmap updates:
    • Updated client and streaming design/docs to reflect the new helper surface.
    • Updated docs/roadmap.md to mark 11.3.1 as DONE after full validation.
    • Adjusted tests/features/client_streaming.feature to include scenarios for typed consumption and control-frame interleaving.
  • Test refactor to remove duplication: extracted common streaming-test helpers into src/client/tests/streaming_helpers.rs and updated fixtures/tests to rely on the new helper surface, reducing duplication across test setup and assertions.

Rationale

  • Provides an ergonomic, trait-based surface to convert a protocol frame stream into a domain-item stream, skipping control frames and surfacing errors without changing transport semantics or existing API boundaries.
  • Keeps the core transport/borrowing model intact while reducing boilerplate across multiplexed protocols.

Plan of Work (Stage A–D)

  • Stage A: lock the helper boundary and file layout
    • Introduce a new src/client/streaming_helpers.rs module and export the API from wireframe::client.
    • Start with a module-level //! doc describing purpose and layering.
  • Stage B: write failing tests first
    • Add unit tests in src/client/tests/streaming_helpers.rs and extend BDD harness for typed-consumption scenarios.
  • Stage C: implement the helper trait and adapter type
    • Implement the adapter stream that maps protocol frames to domain items, skips control frames, preserves order, and surfaces errors.
    • Include Rustdoc examples on the public surface.
  • Stage D: update documentation and roadmap
    • Update client/streaming design docs and the roadmap to reflect the shipped API and DONE status for 11.3.1.

Validation Plan

  • Run formatting, linting, docs checks, and tests as described in the ExecPlan:
    • make fmt, make markdownlint, make check-fmt, make lint, make test, make test-doc, make doctest-benchmark, make nixie.
  • Ensure unit tests and BDD behavioural tests pass, and Rustdoc examples compile under make test-doc.
  • Update client and streaming design docs and the roadmap accordingly; mark 11.3.1 as DONE once full validation passes.

Progress & Status

  • Stage A–C completed: helper boundary, tests, and implementation are in place.
  • Stage D completed: documentation and roadmap updates applied; 11.3.1 marked DONE in docs/roadmap.md after full validation.
  • The PR ships public API surface for ergonomic typed streaming consumption and accompanying tests.

Risks & Considerations

  • Risk of API surface expansion and potential overlap with existing surface areas. Mitigated by keeping the helper additive, client-side, and focused on mapping frames to domain values while preserving transport semantics.

Follow-ups

  • If needed, broaden support to other frame streams or response::FrameStream later, depending on design needs and validation results.
  • Keep docs in sync with any future refinements to the API surface.

📎 Task: https://www.devboxer.com/task/91e39bd0-7323-4ef7-be82-20ef578b8cdb

📎 Task: https://www.devboxer.com/task/f0684432-d8ef-4d2e-8dc2-abbdff16b0e1

📎 Task: https://www.devboxer.com/task/a7f1c718-af9c-4e64-a005-865c381eb922

@sourcery-ai
Copy link
Copy Markdown
Contributor

sourcery-ai Bot commented Mar 21, 2026

Reviewer's Guide

Adds a new client-side streaming helper layer (StreamingResponseExt::typed_with and TypedResponseStream) to adapt streaming response frames into typed domain items while skipping control frames, refactors client streaming fixtures/tests to use shared helpers, and updates BDD scenarios and documentation/roadmap to reflect the new API and mark roadmap item 11.3.1 as done.

Class diagram for new client streaming helpers

classDiagram
    class WireframeClient {
        +builder() WireframeClientBuilder
        +call_streaming<P>(request: P) ResponseStream~P~
        +receive_streaming<P>() ResponseStream~P~
    }

    class ResponseStream~P~ {
        +poll_next() Option~Result~P, ClientError~~
        +is_terminated() bool
        -client_ref : &mut WireframeClient
        -marker_P : PhantomData~P~
    }

    class StreamingResponseExt~P~ {
        <<trait>>
        +typed_with<Item, Mapper>(self, mapper: Mapper) TypedResponseStream~Self, Mapper, P, Item~
    }

    class TypedResponseStream~S, Mapper, P, Item~ {
        +poll_next() Option~Result~Item, ClientError~~
        -inner : S
        -mapper : Mapper
        -_phantom : PhantomData~fn() -> (P, Item)~
    }

    class ClientError {
    }

    %% Relationships
    WireframeClient --> ResponseStream~P~ : creates

    ResponseStream~P~ ..|> StreamingResponseExt~P~ : implements
    class S {
    }

    TypedResponseStream~S, Mapper, P, Item~ *-- S : wraps
    TypedResponseStream~S, Mapper, P, Item~ ..> StreamingResponseExt~P~ : constructed_via
    TypedResponseStream~S, Mapper, P, Item~ ..> ClientError : propagates
Loading

File-Level Changes

Change Details Files
Introduce a generic streaming helper trait and adapter to map protocol frames into typed domain items while skipping control frames and preserving transport semantics.
  • Add new src/client/streaming_helpers.rs module defining StreamingResponseExt extension trait and TypedResponseStream wrapper stream
  • Implement TypedResponseStream as a Stream<Item = Result<Item, ClientError>> that repeatedly polls the inner Stream<Item = Result<P, ClientError>>, applies a mapper FnMut(P) -> Result<Option<Item>, ClientError>, yields Some(item) in order, skips Ok(None), and forwards underlying ClientErrors and termination unchanged
  • Provide Rustdoc examples showing usage against WireframeClient::call_streaming and against a generic futures stream
  • Re-export StreamingResponseExt and TypedResponseStream from src/client/mod.rs
src/client/streaming_helpers.rs
src/client/mod.rs
Add focused unit tests for the streaming helpers to validate ordering, control-frame skipping, error propagation, and empty-stream behaviour.
  • Create src/client/tests/streaming_helpers.rs to test helper behaviour against the existing streaming test infrastructure
  • Cover ordered item mapping, control-frame skipping via Ok(None), mapper-produced errors stopping the stream, propagation of correlation mismatches, disconnects, decode failures, and preservation of empty streams
  • Wire the new test module into src/client/tests/mod.rs
src/client/tests/streaming_helpers.rs
src/client/tests/mod.rs
Refactor BDD client streaming fixtures to share stream-draining logic and to exercise the new typed helper in end-to-end scenarios, including control-frame skipping.
  • Extend ClientStreamingWorld with typed_items storage and helper methods build_request, reset_state, generic drain_stream, and execute_stream_call to reduce duplication between frame-level and typed-level streaming tests
  • Refactor send_streaming_request to use execute_stream_call and add send_typed_streaming_request that calls .typed_with(map_streaming_item) on the response stream and accumulates TypedStreamingItems
  • Introduce TypedStreamingItem newtype plus map_streaming_item mapper that maps specific frame IDs to typed items, treats some IDs as control frames (Ok(None)), and errors on unexpected IDs
  • Add typed_items-based verification method verify_typed_item_order and ensure world reset clears both frames and typed items
  • Augment streaming server modes with ControlInterleaved variant that interleaves data and control frames, plus start_control_interleaved_server helper
tests/fixtures/client_streaming.rs
tests/fixtures/client_streaming/modes.rs
Extend BDD steps, scenarios, and feature definitions to cover typed streaming consumption and control-frame skipping via the helper.
  • Add Given step to start a control-interleaved streaming server and When step to consume the stream through the typed helper, wired to the new world methods
  • Add Then step typed items are received in order as {expected} that parses a comma-separated list of bytes and asserts typed-item ordering via verify_typed_item_order
  • Register new BDD scenarios for typed stream consumption and control-frame skipping in tests/scenarios/client_streaming_scenarios.rs
  • Extend tests/features/client_streaming.feature with two scenarios: one that consumes typed items and another that verifies control-frame skipping when the server interleaves control and data frames
tests/steps/client_streaming_steps.rs
tests/scenarios/client_streaming_scenarios.rs
tests/features/client_streaming.feature
Update user and design documentation plus roadmap to describe the new helper, its semantics, and to mark roadmap item 11.3.1 as completed with an accompanying ExecPlan.
  • In docs/users-guide.md, add guidance on when to use manual StreamExt::next vs StreamingResponseExt::typed_with, and include a full example showing mapping of frames into typed rows with control-frame skipping and preserved error semantics
  • In docs/multi-packet-and-streaming-responses-design.md, document the typed_with mapper contract (Ok(Some), Ok(None), Err), its relationship to ResponseStream, and how it preserves error handling and termination behaviour
  • In docs/wireframe-client-design.md, add a decision record for 11.3.1 explaining why a trait + adapter was chosen, emphasize that the helper is additive and preserves borrow/transport semantics, and highlight it as the recommended layer for multiplexed protocols
  • Create docs/execplans/11-3-1-helpers-for-consuming-streaming-responses.md capturing constraints, risks, public API, plan of work, validation steps, and outcomes for this roadmap item
  • Mark roadmap item 12.3.1 (helper traits/macros for streaming responses) as done in docs/roadmap.md and fix minor formatting in nearby roadmap text
docs/users-guide.md
docs/multi-packet-and-streaming-responses-design.md
docs/wireframe-client-design.md
docs/roadmap.md
docs/execplans/11-3-1-helpers-for-consuming-streaming-responses.md

Tips and commands

Interacting with Sourcery

  • Trigger a new review: Comment @sourcery-ai review on the pull request.
  • Continue discussions: Reply directly to Sourcery's review comments.
  • Generate a GitHub issue from a review comment: Ask Sourcery to create an
    issue from a review comment by replying to it. You can also reply to a
    review comment with @sourcery-ai issue to create an issue from it.
  • Generate a pull request title: Write @sourcery-ai anywhere in the pull
    request title to generate a title at any time. You can also comment
    @sourcery-ai title on the pull request to (re-)generate the title at any time.
  • Generate a pull request summary: Write @sourcery-ai summary anywhere in
    the pull request body to generate a PR summary at any time exactly where you
    want it. You can also comment @sourcery-ai summary on the pull request to
    (re-)generate the summary at any time.
  • Generate reviewer's guide: Comment @sourcery-ai guide on the pull
    request to (re-)generate the reviewer's guide at any time.
  • Resolve all Sourcery comments: Comment @sourcery-ai resolve on the
    pull request to resolve all Sourcery comments. Useful if you've already
    addressed all the comments and don't want to see them anymore.
  • Dismiss all Sourcery reviews: Comment @sourcery-ai dismiss on the pull
    request to dismiss all existing Sourcery reviews. Especially useful if you
    want to start fresh with a new review - don't forget to comment
    @sourcery-ai review to trigger a new review!

Customizing Your Experience

Access your dashboard to:

  • Enable or disable review features such as the Sourcery-generated pull request
    summary, the reviewer's guide, and others.
  • Change the review language.
  • Add, remove or edit custom review instructions.
  • Adjust other review settings.

Getting Help

@coderabbitai
Copy link
Copy Markdown
Contributor

coderabbitai Bot commented Mar 21, 2026

No actionable comments were generated in the recent review. 🎉

ℹ️ Recent review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: ASSERTIVE

Plan: Pro

Run ID: 4f4c9207-2b0f-4fe4-a755-a845abc03f23

📥 Commits

Reviewing files that changed from the base of the PR and between 6192510 and 09c6d6a.

📒 Files selected for processing (1)
  • docs/execplans/11-3-1-helpers-for-consuming-streaming-responses.md

Summary by CodeRabbit

  • New Features

    • Added a trait-based helper and public streaming utilities for typed, selectable frame consumption.
  • Documentation

    • New ExecPlan and updates to user guide, design docs and roadmap describing the helper’s contract, compatibility constraints and validation steps.
  • Tests

    • Added unit and BDD scenarios, fixtures and verification helpers exercising typed-stream behaviour, ordering, error propagation and control-frame skipping.

Walkthrough

Summarise the addition of a trait-based streaming adapter: StreamingResponseExt::typed_with wraps protocol frames into a typed TypedResponseStream via a mapper closure, yielding typed items, skipping control frames, and forwarding underlying ClientErrors without changing transport, correlation or termination semantics.

Changes

Cohort / File(s) Summary
Documentation
docs/execplans/11-3-1-helpers-for-consuming-streaming-responses.md, docs/multi-packet-and-streaming-responses-design.md, docs/users-guide.md, docs/wireframe-client-design.md, docs/roadmap.md
Add ExecPlan and design notes for roadmap item 11.3.1; document StreamingResponseExt::typed_with contract and mapper semantics (Ok(Some) yields, Ok(None) skips, Err terminates); update error/termination behaviour, usage examples and roadmap checklist formatting.
Core API Implementation
src/client/mod.rs, src/client/streaming_helpers.rs
Add streaming_helpers module and public re-exports; implement StreamingResponseExt::typed_with and TypedResponseStream that map Stream<Item = Result<P, ClientError>>Stream<Item = Result<Item, ClientError>>, preserve fused termination, forward underlying ClientErrors unchanged, and set fused state on mapper error.
Unit Tests
src/client/tests/mod.rs, src/client/tests/streaming_helpers.rs
Register new tests module and add async tests covering ordering, control-frame skipping, mapper-error fusion/termination, correlation mismatches, disconnect propagation and malformed-frame propagation.
BDD Scenarios
tests/features/client_streaming.feature, tests/scenarios/client_streaming_scenarios.rs, tests/steps/client_streaming_steps.rs
Add two feature scenarios and matching scenario functions and step definitions to exercise typed-stream extraction and control-frame skipping using the new typed request step.
Test Fixtures & Helpers
tests/fixtures/client_streaming.rs, tests/fixtures/client_streaming/modes.rs
Refactor streaming test helpers into shared executor; add send_typed_streaming_request, TypedStreamingItem type and accessor, ControlInterleaved server mode, server start helper and verify_typed_item_order; update imports to use StreamingResponseExt.

Sequence Diagram(s)

sequenceDiagram
  participant Client
  participant Call as call_streaming()
  participant ResponseStream
  participant Typed as TypedResponseStream
  participant Consumer

  Client->>Call: invoke call_streaming(...)
  Call->>ResponseStream: produce frames (Result<P, ClientError>)
  ResponseStream->>Typed: deliver frame/result
  Typed->>Typed: invoke mapper(P) -> Result<Option<Item>, ClientError>
  alt mapper => Ok(Some(item))
    Typed->>Consumer: yield Ok(item)
  else mapper => Ok(None)
    Typed-->>Typed: skip control frame
  else mapper => Err(err)
    Typed->>Consumer: yield Err(err) and terminate (fused)
  end
  ResponseStream-->>Typed: None (terminator)
  Typed-->>Consumer: None (stream ends)
Loading

Estimated code review effort

🎯 3 (Moderate) | ⏱️ ~25 minutes

Possibly related PRs

Poem

🌊 A mapper skips the noise, yields gold in steady streams,
Frames turn to meaning, ordered as bright beams,
Control frames pass unseen, errors speak their name,
Borrow stays exclusive, transport keeps its frame,
Streams finish clean or fuse when faults proclaim.

🚥 Pre-merge checks | ✅ 3
✅ Passed checks (3 passed)
Check name Status Explanation
Title check ✅ Passed Title accurately describes the primary change: introduction of a public trait and adapter type for typed streaming response consumption.
Description check ✅ Passed Description comprehensively relates to the changeset, detailing new modules, API exports, unit and behavioural tests, fixture updates, and documentation changes across all modified files.
Docstring Coverage ✅ Passed Docstring coverage is 100.00% which is sufficient. The required threshold is 80.00%.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch add-streaming-response-helpers-jxq7p8

Comment @coderabbitai help to get the list of available commands and usage tips.

codescene-delta-analysis[bot]

This comment was marked as outdated.

@leynos leynos changed the title Add ExecPlan for ergonomic streaming-response helpers (11.3.1) Implement typed streaming-response helpers (11.3.1) Mar 22, 2026
codescene-delta-analysis[bot]

This comment was marked as outdated.

@leynos
Copy link
Copy Markdown
Owner Author

leynos commented Mar 22, 2026

@coderabbitai Please suggest a fix for this issue and supply a prompt for an AI coding agent to enable it to apply the fix:

tests/fixtures/client_streaming.rs

Comment on file

        let mut stream = client.call_streaming::<StreamTestEnvelope>(request).await?;

        self.received_frames.clear();
        self.typed_items.clear();

❌ New issue: Code Duplication
The module contains 2 functions with similar structure: ClientStreamingWorld.send_streaming_request,ClientStreamingWorld.send_typed_streaming_request

@leynos
Copy link
Copy Markdown
Owner Author

leynos commented Mar 22, 2026

@coderabbitai Please suggest a fix for this issue and supply a prompt for an AI coding agent to enable it to apply the fix:

src/client/tests/streaming_helpers.rs

Comment on file

async fn typed_response_stream_yields_mapped_items_in_order(
    correlation_id: CorrelationId,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
    let cid = correlation_id;
    let frames = vec![
        super::streaming_infra::TestStreamEnvelope::data(
            MessageId::new(1),
            cid,
            Payload::new(vec![10]),
        ),
        super::streaming_infra::TestStreamEnvelope::data(
            MessageId::new(2),
            cid,
            Payload::new(vec![20]),
        ),
        super::streaming_infra::TestStreamEnvelope::data(
            MessageId::new(3),
            cid,
            Payload::new(vec![30]),
        ),
        super::streaming_infra::TestStreamEnvelope::terminator(cid),
    ];
    let (mut client, _server) = setup_streaming_test(frames).await?;

    let items: Vec<Vec<u8>> = client
        .call_streaming(build_request(cid))
        .await?
        .typed_with(|frame| Ok(Some(frame.payload)))
        .try_collect()
        .await?;

    assert_eq!(items, vec![vec![10], vec![20], vec![30]]);
    Ok(())
}

❌ New issue: Code Duplication
The module contains 2 functions with similar structure: typed_response_stream_skips_control_frames,typed_response_stream_yields_mapped_items_in_order

@coderabbitai

This comment was marked as resolved.

@coderabbitai

This comment was marked as resolved.

codescene-delta-analysis[bot]

This comment was marked as outdated.

@leynos
Copy link
Copy Markdown
Owner Author

leynos commented Mar 23, 2026

@coderabbitai Please suggest a fix for this issue and supply a prompt for an AI coding agent to enable it to apply the fix. Include the file and symbol names indicated in the issue at the head of your response.

tests/fixtures/client_streaming.rs

Comment on file

        let request = Self::build_request();
        self.reset_state();
        let mut client = self.client.take().ok_or("client not connected")?;
        {
            let stream = client.call_streaming::<StreamTestEnvelope>(request).await?;
            self.drain_stream(stream, |world, frame| world.received_frames.push(frame))
                .await;
        }
        self.client = Some(client);

❌ New issue: Code Duplication
The module contains 2 functions with similar structure: ClientStreamingWorld.send_streaming_request,ClientStreamingWorld.send_typed_streaming_request

@coderabbitai

This comment was marked as resolved.

@leynos
Copy link
Copy Markdown
Owner Author

leynos commented Mar 23, 2026

@coderabbitai Please suggest a fix for this issue and supply a prompt for an AI coding agent to enable it to apply the fix. Include the file and symbol names indicated in the issue at the head of your response.

Code Duplication

src/client/tests/streaming_helpers.rs:

What lead to degradation?

The module contains 2 functions with similar structure: typed_response_stream_skips_control_frames,typed_response_stream_yields_mapped_items_in_order

Why does this problem occur?

Duplicated code often leads to code that's harder to change since the same logical change has to be done in multiple functions. More duplication gives lower code health.

How to fix it?

A certain degree of duplicated code might be acceptable. The problems start when it is the same behavior that is duplicated across the functions in the module, ie. a violation of the Don't Repeat Yourself (DRY) principle. DRY violations lead to code that is changed together in predictable patterns, which is both expensive and risky. DRY violations can be identified using CodeScene's X-Ray analysis to detect clusters of change coupled functions with high code similarity. Read More
Once you have identified the similarities across functions, look to extract and encapsulate the concept that varies into its own function(s). These shared abstractions can then be re-used, which minimizes the amount of duplication and simplifies change.

@leynos
Copy link
Copy Markdown
Owner Author

leynos commented Mar 23, 2026

@coderabbitai Please suggest a fix for this issue and supply a prompt for an AI coding agent to enable it to apply the fix. Include the file and symbol names indicated in the issue at the head of your response.

Code Duplication

tests/fixtures/client_streaming.rs:

What lead to degradation?

The module contains 2 functions with similar structure: ClientStreamingWorld.send_streaming_request,ClientStreamingWorld.send_typed_streaming_request

Why does this problem occur?

Duplicated code often leads to code that's harder to change since the same logical change has to be done in multiple functions. More duplication gives lower code health.

How to fix it?

A certain degree of duplicated code might be acceptable. The problems start when it is the same behavior that is duplicated across the functions in the module, ie. a violation of the Don't Repeat Yourself (DRY) principle. DRY violations lead to code that is changed together in predictable patterns, which is both expensive and risky. DRY violations can be identified using CodeScene's X-Ray analysis to detect clusters of change coupled functions with high code similarity. Read More
Once you have identified the similarities across functions, look to extract and encapsulate the concept that varies into its own function(s). These shared abstractions can then be re-used, which minimizes the amount of duplication and simplifies change.

@coderabbitai
Copy link
Copy Markdown
Contributor

coderabbitai Bot commented Mar 23, 2026

Tip

For best results, initiate chat on the files or code changes.

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
cat src/client/tests/streaming_helpers.rs

Length of output: 8269


File: src/client/tests/streaming_helpers.rstyped_response_stream_yields_mapped_items_in_order, typed_response_stream_skips_control_frames

@leynos: This fix has already been applied. The collect_typed_items helper is present in the file (lines 31–52), and both typed_response_stream_yields_mapped_items_in_order and typed_response_stream_skips_control_frames now delegate to it, with only their distinct frame payloads, mapper closure, and assertion remaining in each test body. No further changes are required.

@coderabbitai

This comment was marked as resolved.

codescene-delta-analysis[bot]

This comment was marked as outdated.

@leynos
Copy link
Copy Markdown
Owner Author

leynos commented Mar 23, 2026

@coderabbitai

Re: Task: eliminate remaining structural duplication in tests/fixtures/client_streaming.rs

The engineer writes:

The specification appears to have been written without accounting for this Rust limitation. I recommend one of:

  1. Accept a slightly different signature that achieves the same goal without HRTB trait objects
  2. Contact the task author to clarify if there's a known workaround
  3. Relax the clippy check for this specific pattern if the tests pass

The duplication has been eliminated and the tests work correctly - it's purely a lifetime verification issue with the strictest clippy settings.

@coderabbitai

This comment was marked as resolved.

codescene-delta-analysis[bot]

This comment was marked as outdated.

@leynos
Copy link
Copy Markdown
Owner Author

leynos commented Mar 23, 2026

@coderabbitai Please suggest a fix for this issue and supply a prompt for an AI coding agent to enable it to apply the fix. Include the file and symbol names indicated in the issue at the head of your response. Ensure that this is validated against the current version of the codegraph.

If further refinement to address this finding would be deleterious, please supply a clear explanatory one to two paragraph markdown message I can paste into the CodeScene web ui's diagnostic suppression function so this diagnostic can be silenced.

Code Duplication

src/client/tests/streaming_helpers.rs:

What lead to degradation?

The module contains 2 functions with similar structure: typed_response_stream_skips_control_frames,typed_response_stream_yields_mapped_items_in_order

Why does this problem occur?

Duplicated code often leads to code that's harder to change since the same logical change has to be done in multiple functions. More duplication gives lower code health.

How to fix it?

A certain degree of duplicated code might be acceptable. The problems start when it is the same behavior that is duplicated across the functions in the module, ie. a violation of the Don't Repeat Yourself (DRY) principle. DRY violations lead to code that is changed together in predictable patterns, which is both expensive and risky. DRY violations can be identified using CodeScene's X-Ray analysis to detect clusters of change coupled functions with high code similarity. Read More
Once you have identified the similarities across functions, look to extract and encapsulate the concept that varies into its own function(s). These shared abstractions can then be re-used, which minimizes the amount of duplication and simplifies change.

@coderabbitai
Copy link
Copy Markdown
Contributor

coderabbitai Bot commented Mar 23, 2026

Tip

For best results, initiate chat on the files or code changes.

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
cat src/client/tests/streaming_helpers.rs

Length of output: 8269


File: src/client/tests/streaming_helpers.rstyped_response_stream_yields_mapped_items_in_order, typed_response_stream_skips_control_frames

@leynos: This fix is already in place. The collect_typed_items helper (lines 31–52) owns the entire setup-and-collect skeleton, and both typed_response_stream_yields_mapped_items_in_order and typed_response_stream_skips_control_frames now delegate to it — each retaining only its distinct frame payloads, mapper closure, and assert_eq!. No further changes are required for this issue.

leynos and others added 4 commits March 23, 2026 19:35
This adds a new execplan document (11-3-1-helpers-for-consuming-streaming-responses.md) that extensively lays out the design, constraints, risks, progress, and plan for implementing ergonomic helpers to consume streaming responses. It addresses roadmap item 11.3.1 and provides rationale, test strategy, API sketches, and documentation plans to improve usability of client streaming APIs without changing existing transport or borrow semantics.

Co-authored-by: devboxerhub[bot] <devboxerhub[bot]@users.noreply.github.com>
- Introduce `StreamingResponseExt` trait with method `typed_with` to adapt streaming response frames into typed domain items.
- Add `TypedResponseStream` stream adapter to map frames, allowing skipping control frames while preserving error propagation and stream termination semantics.
- Update client docs with usage examples demonstrating ergonomic typed consumption for multiplexed protocols.
- Extend integration tests and fixtures to verify correct skipping of control frames and error handling.
- Document design decisions and update roadmap to mark completion of streaming helpers milestone (11.3.1).

Co-authored-by: devboxerhub[bot] <devboxerhub[bot]@users.noreply.github.com>
…on logic

- Introduced `collect_typed_items` helper to reduce duplication in streaming_helpers.rs tests.
- Simplified test code by using the new helper function.
- In client_streaming.rs, refactored ClientStreamingWorld to add `reset_state` and `drain_stream` helpers.
- Refactored streaming request methods to use these helpers, improving readability and maintainability.

Co-authored-by: devboxerhub[bot] <devboxerhub[bot]@users.noreply.github.com>
…cation

Introduced the `execute_streaming` helper method in ClientStreamingWorld to consolidate
common logic used in sending streaming requests in the client streaming tests.
Updated `send_streaming_request` and `send_typed_streaming_request` to use this helper,
improving code readability and maintainability by removing duplicated code blocks.

Co-authored-by: devboxerhub[bot] <devboxerhub[bot]@users.noreply.github.com>
@coderabbitai

This comment was marked as resolved.

@devboxerhub devboxerhub Bot force-pushed the add-streaming-response-helpers-jxq7p8 branch from a89e043 to 11882cb Compare March 23, 2026 19:43
codescene-delta-analysis[bot]

This comment was marked as outdated.

…re draining

Refactored `execute_stream_call` in `client_streaming.rs` to collect all streamed items into a vector before draining instead of directly processing the stream. This simplifies the logic by removing the complex stream type and managing a collected vector of items, improving readability and maintenance of the client streaming test fixture.

Co-authored-by: devboxerhub[bot] <devboxerhub[bot]@users.noreply.github.com>
codescene-delta-analysis[bot]

This comment was marked as outdated.

@leynos
Copy link
Copy Markdown
Owner Author

leynos commented Mar 26, 2026

@coderabbitai Please suggest a fix for this issue and supply a prompt for an AI coding agent to enable it to apply the fix. Include the file and symbol names indicated in the issue at the head of your response. Ensure that this is validated against the current version of the codegraph.

If further refinement to address this finding would be deleterious, please supply a clear explanatory one to two paragraph markdown message I can paste into the CodeScene web ui's diagnostic suppression function so this diagnostic can be silenced.

tests/fixtures/client_streaming.rs

Comment on file

        self.execute_stream_call(
            |client| {
                Box::pin(async move {
                    let stream = client
                        .call_streaming::<StreamTestEnvelope>(Self::build_request())
                        .await?;
                    Ok(stream.collect().await)
                })
            },
            |world, frame| world.received_frames.push(frame),
        )
        .await
    }

❌ New issue: Code Duplication
The module contains 2 functions with similar structure: ClientStreamingWorld.send_streaming_request,ClientStreamingWorld.send_typed_streaming_request

@coderabbitai

This comment was marked as resolved.

codescene-delta-analysis[bot]

This comment was marked as outdated.

@leynos leynos marked this pull request as ready for review March 26, 2026 14:07
sourcery-ai[bot]

This comment was marked as resolved.

@coderabbitai coderabbitai Bot added the codex label Mar 26, 2026
Copy link
Copy Markdown

@chatgpt-codex-connector chatgpt-codex-connector Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: 5130b9dbbc

ℹ️ About Codex in GitHub

Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".

Comment thread src/client/streaming_helpers.rs Outdated
Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@docs/wireframe-client-design.md`:
- Around line 481-492: Replace the American spelling "adapter" with the British
spelling "adaptor" in the decision record for 11.3.1; specifically update the
phrase that currently reads "adapter stream" to "adaptor stream" so it matches
the document's existing usage (e.g., elsewhere and around the mention of
StreamingResponseExt::typed_with and the decision header "Decision record for
11.3.1").
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: ASSERTIVE

Plan: Pro

Run ID: 29218589-d7e3-4a9a-9988-34dd267ad41c

📥 Commits

Reviewing files that changed from the base of the PR and between bb1e2e1 and 5130b9d.

📒 Files selected for processing (14)
  • docs/execplans/11-3-1-helpers-for-consuming-streaming-responses.md
  • docs/multi-packet-and-streaming-responses-design.md
  • docs/roadmap.md
  • docs/users-guide.md
  • docs/wireframe-client-design.md
  • src/client/mod.rs
  • src/client/streaming_helpers.rs
  • src/client/tests/mod.rs
  • src/client/tests/streaming_helpers.rs
  • tests/features/client_streaming.feature
  • tests/fixtures/client_streaming.rs
  • tests/fixtures/client_streaming/modes.rs
  • tests/scenarios/client_streaming_scenarios.rs
  • tests/steps/client_streaming_steps.rs

Comment thread docs/wireframe-client-design.md
…ve tests to streaming helpers

- Introduced `typed_with` method on streaming response to map frames to typed items with error handling.
- Added detailed examples demonstrating usage of typed_with with a mapper function.
- Improved doc formatting and clarified potential BDD risks in consuming streaming responses.
- Added extensive new tests in streaming_helpers.rs for typed response streaming, covering skipping control frames, surfacing mapper errors, and empty item handling.
- Refactored typed_with trait constraints for ergonomic usage.

This enhances streaming response handling with typed consumption and robust validation.

Co-authored-by: devboxerhub[bot] <devboxerhub[bot]@users.noreply.github.com>
codescene-delta-analysis[bot]

This comment was marked as outdated.

@leynos
Copy link
Copy Markdown
Owner Author

leynos commented Mar 26, 2026

@coderabbitai Have the following now been resolved?

Annul any requirements that violate the en-GB-oxendict spelling (-ize / -yse / -our) conventions (for example a request to replace "normalize" with "normalise" or "artefact" with "artifact"), or where the requirement unnecessarily increases cyclomatic complexity.

Please address the comments from this code review:

## Overall Comments
- The new `execute_stream_call` helper in `ClientStreamingWorld` uses a fairly complex `for<'a> FnOnce(...) -> Pin<Box<dyn Future + 'a>>` signature; consider making it generic over the future type (e.g. `F: for<'a> FnOnce(&'a mut WireframeClient) -> Fut` with `Fut: Future + 'a`) to avoid the boxed trait object and make the call site easier to reason about.
- There are now two separate `build_request` helpers (one in the BDD fixture and one in `streaming_helpers` tests) that construct the same kind of envelope; consider consolidating this into a shared helper to avoid subtle divergence between test setups over time.

## Individual Comments

### Comment 1
<location path="src/client/streaming_helpers.rs" line_range="106-117" />
<code_context>
+/// # Ok(())
+/// # }
+/// ```
+pub trait StreamingResponseExt<P>: Stream<Item = Result<P, ClientError>> + Sized {
+    /// Adapt protocol frames into typed items, skipping control frames when
+    /// the mapper returns `Ok(None)`.
+    #[must_use]
+    fn typed_with<Item, Mapper>(self, mapper: Mapper) -> TypedResponseStream<Self, Mapper, P, Item>
+    where
+        Self: Unpin,
+        Mapper: FnMut(P) -> Result<Option<Item>, ClientError> + Unpin,
+    {
+        TypedResponseStream::new(self, mapper)
+    }
+}
+
+impl<S, P> StreamingResponseExt<P> for S where S: Stream<Item = Result<P, ClientError>> + Sized {}
+
+/// Stream adapter that maps protocol frames into domain items.
</code_context>
<issue_to_address>
**suggestion:** Loosen `Unpin` constraints from the extension trait API surface.

The `typed_with` method and trait don’t actually need `Unpin` bounds:

- `typed_with` just forwards `self` and `mapper` into `TypedResponseStream::new` without pinning.
- The `Stream` impl for `TypedResponseStream` is where `S: Unpin` is required, so callers using the adapter will still need `Unpin` on the underlying stream.
- `Mapper` is only ever used via `&mut`, so it doesn’t need `Unpin`.

You can drop the `Self: Unpin` and `Mapper: Unpin` bounds from the trait and method, and keep `S: Unpin` only on the `impl Stream for TypedResponseStream<...>` block, while preserving behavior and making the API more flexible.

```suggestion
pub trait StreamingResponseExt<P>: Stream<Item = Result<P, ClientError>> + Sized {
    /// Adapt protocol frames into typed items, skipping control frames when
    /// the mapper returns `Ok(None)`.
    #[must_use]
    fn typed_with<Item, Mapper>(self, mapper: Mapper) -> TypedResponseStream<Self, Mapper, P, Item>
    where
        Mapper: FnMut(P) -> Result<Option<Item>, ClientError>,
    {
        TypedResponseStream::new(self, mapper)
    }
}
```
</issue_to_address>

### Comment 2
<location path="src/client/tests/streaming_helpers.rs" line_range="53-62" />
<code_context>
+    Ok(())
+}
+
+#[rstest]
+#[tokio::test]
+async fn typed_response_stream_skips_control_frames(
+    correlation_id: CorrelationId,
+) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
</code_context>
<issue_to_address>
**suggestion (testing):** Add a test for streams that contain only control frames (all `Ok(None)` from the mapper).

We already cover mixed data/control frames and empty streams, but not the case where the underlying stream yields several frames and the mapper returns `Ok(None)` for all of them (non-empty protocol stream, empty typed stream). Please add a test like `typed_response_stream_all_control_frames_yields_no_items` that sends several frames with the mapper always returning `Ok(None)`, and assert that the adapter drains the stream and completes without yielding any items.

Suggested implementation:

```rust
#[rstest]
#[tokio::test]
async fn typed_response_stream_all_control_frames_yields_no_items(
    correlation_id: CorrelationId,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
    let cid = correlation_id;

    // Underlying stream yields several data frames for the same correlation id
    let frames = vec![
        super::streaming_infra::TestStreamEnvelope::data(
            MessageId::new(1),
            cid,
            Payload::new(vec![10]),
        ),
        super::streaming_infra::TestStreamEnvelope::data(
            MessageId::new(2),
            cid,
            Payload::new(vec![20]),
        ),
        super::streaming_infra::TestStreamEnvelope::data(
            MessageId::new(3),
            cid,
            Payload::new(vec![30]),
        ),
    ];

    // Mapper treats every frame as a control frame by always returning Ok(None)
    let mapper = |_payload| Ok(None::<()>);

    let (mut client, _server) = setup_streaming_test(frames).await?;
    let items: Vec<()> = client
        .call_streaming(build_request(cid))
        .await?
        .typed_with(mapper)
        .try_collect()
        .await?;

    // Non-empty protocol stream, but the typed stream must be empty and fully drained
    assert!(items.is_empty());

    Ok(())
}

#[rstest]
#[tokio::test]
async fn typed_response_stream_yields_mapped_items_in_order(
    correlation_id: CorrelationId,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {

```

This new test assumes that:
1. `setup_streaming_test`, `build_request`, `MessageId`, and `Payload` are already in scope in this module (they are used elsewhere in the file).
2. The `typed_with` adapter accepts a mapper of the form `Fn(_) -> Result<Option<T>, _>` and can infer `T = ()` from `Ok(None::<()>)`.

If the existing tests use a different mapper signature (e.g., a specific error type), you may need to:
- Adjust the mapper closure’s return type accordingly, e.g. `let mapper = |_payload| -> Result<Option<()>, SomeError> { Ok(None) };`.
- Align the `items` type annotation (`Vec<()>`) with whatever typed item type is used in your other control-frame tests.
</issue_to_address>

### Comment 3
<location path="src/client/tests/streaming_helpers.rs" line_range="122-80" />
<code_context>
+async fn typed_response_stream_surfaces_mapper_errors(
</code_context>
<issue_to_address>
**suggestion (testing):** Exercise mapper errors that occur after one or more skipped control frames.

The existing test covers mapper errors when there are no prior control frames. To also validate the control-frame–skipping behavior, add a variant where the mapper returns `Ok(None)` for one or more frames before returning `Err` on a later frame, and assert that only items before the error are yielded, the error is surfaced next, and no subsequent frames are processed.

Suggested implementation:

```rust
    let cid = correlation_id;
    let frames = vec![
        // This first frame is intended to be skipped by the mapper (Ok(None))
        super::streaming_infra::TestStreamEnvelope::data(
            MessageId::new(0),
            cid,
            Payload::new(vec![0]),
        ),
        super::streaming_infra::TestStreamEnvelope::data(
            MessageId::new(1),
            cid,
            Payload::new(vec![1]),
        ),
        super::streaming_infra::TestStreamEnvelope::data(

```

To fully implement the test behavior described in your comment, you should also:

1. Update the mapper used in `typed_response_stream_surfaces_mapper_errors` so that:
   - For the first frame (e.g. where `message_id == MessageId::new(0)` or `payload == vec![0]`), it returns `Ok(None)` to simulate a control frame being skipped.
   - For a later frame (e.g. `message_id == MessageId::new(2)` or some other distinguishing condition), it returns `Err(...)` to surface the mapper error.
   - For any frames between the first skipped one and the error frame, it returns `Ok(Some(item))` so that you can assert they are yielded before the error.

   A typical pattern inside the mapper closure would be:
   ```rust
   let mut seen = 0;
   let mapper = move |frame: TestStreamEnvelope<_>| {
       seen += 1;
       match seen {
           1 => Ok(None),                  // skip first (control-like) frame
           2 => Ok(Some(vec![1])),         // yield one item
           3 => Err(anyhow!("mapper err")),// error after a skipped frame
           _ => panic!("should not be called after error"),
       }
   };
   ```

2. Extend the assertions in the test to validate:
   - Only items produced before the error are collected (e.g. `items == vec![vec![1]]`).
   - The next poll on the stream (or the join handle result, depending on how the helper returns errors) surfaces the mapper error.
   - No subsequent frames (e.g. a 4th frame you might append to `frames`) are processed—e.g. by panicking in the mapper on `seen >= 4` as shown above.

3. If you prefer not to change the existing test semantics for “no prior control frames”, you can instead:
   - Keep `typed_response_stream_surfaces_mapper_errors` as-is.
   - Add a separate test (e.g. `typed_response_stream_surfaces_mapper_errors_after_skipped_frames`) that uses the modified `frames` vector and mapper logic above to specifically cover the “after one or more skipped control frames” scenario.
</issue_to_address>

### Comment 4
<location path="tests/fixtures/client_streaming/modes.rs" line_range="31-35" />
<code_context>
 pub enum StreamingServerMode {
     /// Send `data_count` data frames then a terminator.
     Normal { data_count: usize },
+    /// Send data frames interleaved with control frames, then a terminator.
+    ControlInterleaved,
     /// Send one frame with a wrong correlation ID.
</code_context>
<issue_to_address>
**suggestion (testing):** Consider adding a BDD scenario where the typed mapper returns an error for an unexpected frame id.

Right now, the unexpected-frame-id branch in `map_streaming_item` is only covered by `src/client/tests/streaming_helpers.rs`. Consider adding a server mode that emits a frame with an unsupported `id`, plus a Gherkin scenario that asserts the client sees a `ClientError` from the typed helper, so mapper errors are exercised in an end-to-end flow as well.

Suggested implementation:

```rust
pub enum StreamingServerMode {
    /// Send `data_count` data frames then a terminator.
    Normal { data_count: usize },
    /// Send data frames interleaved with control frames, then a terminator.
    ControlInterleaved,
    /// Send one data frame with an unsupported message id, then a terminator.
    UnsupportedMessageId,
    /// Send one frame with a wrong correlation ID.
    Mismatch,

```

To fully implement the behavior described in your review comment, the following additional changes are required in this repo:

1. In the same `modes.rs` file, locate the async function that matches on `StreamingServerMode` (the one that already has arms for `Normal`, `ControlInterleaved`, `Mismatch`, etc.). Add a new match arm:

   - Pattern: `StreamingServerMode::UnsupportedMessageId => { ... }`
   - Behavior:
     - Build a sequence of frames that includes at least one `StreamTestEnvelope::data` with a `MessageId` value that is *not* mapped by the typed helper (i.e., an id that triggers the “unexpected frame id” branch in `map_streaming_item`), followed by a terminator for the same correlation id:
       - Example structure (adapt to existing helpers):

         ```rust
         StreamingServerMode::UnsupportedMessageId => {
             let frames = vec![
                 StreamTestEnvelope::data(
                     MessageId::new(999), // pick an id not used by the typed mapper
                     cid,
                     Payload::new(vec![9]),
                 ),
                 StreamTestEnvelope::terminated(cid),
             ];
             send_frames(framed_transport, frames).await;
             true
         }
         ```

       - Use the same helper (`send_frames`, `send_data_and_terminator`, or equivalent) and coding style as other arms in this match.

2. In the BDD test layer (likely under `tests/fixtures/client_streaming` or a sibling folder where other streaming client feature files live):

   - Add a new Gherkin scenario to the appropriate `.feature` file (or create a new one if needed). The scenario should:
     - Start the streaming server in `UnsupportedMessageId` mode (e.g., via a step like “Given the streaming server runs in mode `UnsupportedMessageId`” or similar, matching existing step wording).
     - Run the client using the typed streaming helper that uses `map_streaming_item`.
     - Assert that the client receives a `ClientError` (or whatever the concrete error type/enum variant is) originating from the typed mapper when it encounters the unsupported message id.

   - Implement or extend the step definition that maps the textual mode name in the Gherkin step to the `StreamingServerMode::UnsupportedMessageId` variant, alongside the existing ones for `Normal`, `ControlInterleaved`, etc.

3. If there is a centralized typed mapper registry or enum-to-handler mapping used by `map_streaming_item`, confirm that the chosen `MessageId` value for `UnsupportedMessageId` is *not* present in that registry, so the “unexpected frame id” branch is exercised.

These additional changes will ensure that the “unexpected frame id” path is covered end-to-end: the server emits a frame with an unsupported id, the typed helper attempts to map it, and the client surface is verified to see a `ClientError` from the mapper.
</issue_to_address>

### Comment 5
<location path="src/client/streaming_helpers.rs" line_range="106" />
<code_context>
+/// # Ok(())
+/// # }
+/// ```
+pub trait StreamingResponseExt<P>: Stream<Item = Result<P, ClientError>> + Sized {
+    /// Adapt protocol frames into typed items, skipping control frames when
+    /// the mapper returns `Ok(None)`.
</code_context>
<issue_to_address>
**issue (review_instructions):** Add both unit and behavioural tests that exercise `StreamingResponseExt::typed_with` and `TypedResponseStream` end-to-end.

Add focused `rstest` unit tests in `src/client/tests/streaming_helpers.rs` that cover ordered delivery, control-frame skipping via `Ok(None)`, mapper error propagation, and forwarding of underlying `ClientError`s.

In addition, add or update `rstest-bdd` scenarios under `tests/features/client_streaming.feature` (and corresponding fixtures/steps) to exercise `typed_with` against the real `WireframeClient` streaming path, so the new helper is covered by behavioural tests rather than only docs and unit tests.

<details>
<summary>Review instructions:</summary>

**Path patterns:** `**/*`

**Instructions:**
Rules:
- For any new feature or change to an existing feature, both behavioural *and* unit tests are required.

</details>
</issue_to_address>

### Comment 6
<location path="src/client/streaming_helpers.rs" line_range="91" />
<code_context>
+/// let rows: Vec<Row> = client
+///     .call_streaming::<MyEnvelope>(request)
+///     .await?
+///     .typed_with(|frame| match frame.id {
+///         1 => Ok(Some(Row(frame.payload))),
+///         2 => Ok(None),
</code_context>
<issue_to_address>
**suggestion (review_instructions):** The `match frame.id` with three branches should be extracted into a small predicate/helper function to comply with the multi-branch conditional guideline, even in this example code.

This example uses a `match frame.id` with three branches directly inside the closure passed to `typed_with`. Per the guideline, conditionals with more than two branches should be moved into a predicate/helper function.

Here, you could define a small helper like `fn map_frame(frame: MyEnvelope) -> Result<Option<Row>, ClientError>` and then call it from the closure:

```rust
.typed_with(|frame| map_frame(frame))
```

or even pass the function directly if the signature matches. That keeps the branching logic encapsulated and aligns the example with the project’s conditional/predicate rule.

<details>
<summary>Review instructions:</summary>

**Path patterns:** `**/*.rs`

**Instructions:**
Move conditionals with >2 branches into a predicate function.

</details>
</issue_to_address>

### Comment 7
<location path="docs/execplans/11-3-1-helpers-for-consuming-streaming-responses.md" line_range="139" />
<code_context>
+  keep mapper failures and transport failures distinct in the helper's return
+  type and document that split explicitly.
+
+- Risk: BDD scenarios can pass by exercising fixture-local convenience methods
+  instead of the new public helper. Severity: medium. Likelihood: medium.
+  Mitigation: the fixture must call the public helper API directly, and the
</code_context>
<issue_to_address>
**suggestion (review_instructions):** The acronym “BDD” is used here without being defined on first use, which conflicts with the requirement to define uncommon acronyms.

Consider expanding the acronym the first time it appears, for example: “Risk: behaviour-driven development (BDD) scenarios can pass …”. Subsequent uses can then rely on the acronym alone.

<details>
<summary>Review instructions:</summary>

**Path patterns:** `**/*.md`

**Instructions:**
Define uncommon acronyms on first use.

</details>
</issue_to_address>

@coderabbitai
Copy link
Copy Markdown
Contributor

coderabbitai Bot commented Mar 26, 2026

Tip

For best results, initiate chat on the files or code changes.

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
echo "=== src/client/streaming_helpers.rs ==="
cat src/client/streaming_helpers.rs

echo ""
echo "=== src/client/tests/streaming_helpers.rs ==="
cat src/client/tests/streaming_helpers.rs

echo ""
echo "=== tests/fixtures/client_streaming.rs ==="
cat tests/fixtures/client_streaming.rs

echo ""
echo "=== tests/fixtures/client_streaming/modes.rs ==="
cat tests/fixtures/client_streaming/modes.rs

echo ""
echo "=== docs/execplans/11-3-1-helpers-for-consuming-streaming-responses.md (first 60 lines) ==="
head -60 docs/execplans/11-3-1-helpers-for-consuming-streaming-responses.md

echo ""
echo "=== tests/features/client_streaming.feature ==="
cat tests/features/client_streaming.feature

echo ""
echo "=== tests/steps/client_streaming_steps.rs ==="
cat tests/steps/client_streaming_steps.rs

Length of output: 47835


Need to check the execplan for the BDD acronym definition before giving a final answer.


🏁 Script executed:

#!/bin/bash
# Find first occurrence of BDD in the execplan and surrounding context
grep -n "BDD\|behaviour-driven\|Behaviour-Driven" docs/execplans/11-3-1-helpers-for-consuming-streaming-responses.md | head -20

Length of output: 551


@leynos: Here is the full status across all items, with annulments applied.


Status summary

✅ Resolved

# Comment Evidence
1 Loosen Unpin constraints from typed_with Self: Unpin and Mapper: Unpin removed from the method/trait signature; S: Unpin + Mapper: Unpin retained only on impl Stream for TypedResponseStream.
2 Add all-control-frames test typed_response_stream_all_control_frames_yields_no_items present in src/client/tests/streaming_helpers.rs.
3 Mapper errors after skipped control frames typed_response_stream_surfaces_mapper_errors_after_skipped_frames present; implemented with a clean payload-comparison approach rather than the complexity-increasing mutable-counter closure suggested in the original comment.
5 Both unit and behavioural tests Unit tests in src/client/tests/streaming_helpers.rs; BDD scenarios in tests/features/client_streaming.feature with step definitions and fixtures wired up.
6 Extract multi-branch match from doc example fn map_frame(frame: MyEnvelope) -> Result<Option<Row>, ClientError> defined outside the closure; .typed_with(map_frame) used at the call site.

🚫 Annulled

# Comment Reason
4 Add UnsupportedMessageId server mode + BDD scenario The unexpected-frame-id path is already exercised by typed_response_stream_surfaces_mapper_errors and typed_response_stream_surfaces_mapper_errors_after_skipped_frames. Adding a new server-mode variant, Gherkin scenario, step definitions, and fixture method solely to duplicate that coverage at the BDD layer would increase cyclomatic complexity across four files without closing a genuine gap. Annulled under the rule that requirements that unnecessarily increase cyclomatic complexity are voided.

❌ Not yet resolved

Overall Comment 1 — execute_stream_call future genericity

execute_stream_call still boxes the future and collects into Vec<Result<Item, ClientError>> before re-streaming. The "consider" suggestion to make it generic over the future type (avoiding the box and the intermediate allocation) is still open. This is a style/performance improvement, not a correctness issue.

Overall Comment 2 — Two build_request helpers

tests/fixtures/client_streaming.rs has fn build_request() -> StreamTestEnvelope and src/client/tests/streaming_helpers.rs has fn build_request(cid: CorrelationId) -> TestStreamEnvelope. These helpers serve different module hierarchies (TestStreamEnvelope is internal to the unit-test infra; StreamTestEnvelope is a BDD fixture type). Consolidation would require exposing internal test infrastructure across module boundaries. Still open, but accepting divergence here is defensible.

Comment 7 — BDD acronym first-use definition

BDD appears first at line 78 of docs/execplans/11-3-1-helpers-for-consuming-streaming-responses.md without expansion. The expansion behaviour-driven development (BDD) is only introduced at line 139. Move the expansion to line 78 (or whichever is the true first use) and leave subsequent uses as BDD alone.

…ision record

Co-authored-by: devboxerhub[bot] <devboxerhub[bot]@users.noreply.github.com>
codescene-delta-analysis[bot]

This comment was marked as outdated.

@leynos leynos changed the title Add streaming helpers and refactor client_streaming tests Refactor client_streaming tests to remove duplication and add helpers Mar 26, 2026
Prevent polling the typed response stream after a mapper error by marking it as terminated. This ensures the stream returns None on subsequent polls, avoiding repeated errors and complying with the fused stream contract.

- Added 'terminated' flag to the stream wrapper to track termination state.
- Updated poll_next implementation to respect termination and stop polling.
- Added test to verify stream fusion behavior after mapper failure.
- Adjusted test fixture to propagate stream errors properly and mark clean termination.

Co-authored-by: devboxerhub[bot] <devboxerhub[bot]@users.noreply.github.com>
@leynos leynos changed the title Refactor client_streaming tests to remove duplication and add helpers Refactor client_streaming tests and add typed streaming helpers Mar 26, 2026
codescene-delta-analysis[bot]

This comment was marked as outdated.

Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 4

🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@docs/execplans/11-3-1-helpers-for-consuming-streaming-responses.md`:
- Around line 358-363: Stage B behavioural coverage bullets in the markdown have
a semicolon-plus-conjunction carrying punctuation across two list items; in the
"Stage B behavioural coverage" bullet list of
11-3-1-helpers-for-consuming-streaming-responses.md change the first bullet's
trailing semicolon and "and" to a full stop and ensure the second bullet is a
standalone sentence ending with a full stop so both bullets read as parallel,
independent items; verify the two bullets that mention extending
tests/features/client_streaming.feature and updating
tests/fixtures/client_streaming.rs, tests/steps/client_streaming_steps.rs, and
tests/scenarios/client_streaming_scenarios.rs each end with a period and have no
connecting conjunction.

In `@src/client/streaming_helpers.rs`:
- Around line 110-119: The trait method StreamingResponseExt::typed_with
currently constructs a TypedResponseStream without requiring S: Unpin or Mapper:
Unpin, yet the Stream impl for TypedResponseStream calls get_mut() on a pinned
self (and requires S: Unpin and Mapper: Unpin), so add appropriate Unpin bounds
to the typed_with signature (e.g., require Self: Unpin and Mapper: Unpin) so
callers fail at the API boundary, or alternatively refactor
TypedResponseStream's Stream impl to use pin-project (decomposing the pinned
struct) so get_mut() is not used and !Unpin inputs are supported; update the
typed_with method signature accordingly if you choose the Unpin-bound approach,
or apply pin-project annotations to TypedResponseStream and its Stream impl to
avoid adding Unpin bounds.

In `@tests/fixtures/client_streaming.rs`:
- Around line 195-208: The typed mapper currently accepts any payload with at
least one byte by calling payload.into_inner().into_iter().next() and discarding
the rest; change the logic in the match arm that handles frame.id.get() (the
branch that constructs TypedStreamingItem) to enforce that the payload contains
exactly one byte: obtain the payload buffer (payload.into_inner()), verify its
length is exactly 1 (or check the iterator yields one byte and then None), and
if not return a ClientError (same style as the existing InvalidData error)
instead of accepting the frame; keep the construction of
TypedStreamingItem(value) only when exactly one byte is present.
- Around line 85-88: The test drops the client when collect(...) returns an Err
because self.client is taken then immediately await ? propagates the error
before restoring it; change the pattern in the block around reset_state(), the
client field, and collect to take the client into a local variable, await
collect into a Result (e.g., let result = collect(&mut client).await), then
always restore self.client = Some(client) before returning or propagating
(result?), so that call_streaming and collect failures do not leave self.client
absent.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: ASSERTIVE

Plan: Pro

Run ID: 61df02dc-2024-45d0-aa48-01cd7ca23fbc

📥 Commits

Reviewing files that changed from the base of the PR and between 5130b9d and c66ad6c.

📒 Files selected for processing (5)
  • docs/execplans/11-3-1-helpers-for-consuming-streaming-responses.md
  • docs/wireframe-client-design.md
  • src/client/streaming_helpers.rs
  • src/client/tests/streaming_helpers.rs
  • tests/fixtures/client_streaming.rs

Comment thread docs/execplans/11-3-1-helpers-for-consuming-streaming-responses.md
Comment thread src/client/streaming_helpers.rs
Comment thread tests/fixtures/client_streaming.rs
Comment thread tests/fixtures/client_streaming.rs Outdated
…nstraint

- Updated StreamingResponseExt::typed_with to require `Self: Unpin` and `Mapper: Unpin`.
- Improved payload byte extraction to ensure exactly one byte is present, returning an error otherwise.
- Refactored test fixture for safer async result handling.
- Minor doc fixes related to client streaming feature scenarios.

Co-authored-by: devboxerhub[bot] <devboxerhub[bot]@users.noreply.github.com>
@leynos leynos changed the title Refactor client_streaming tests and add typed streaming helpers Add StreamingResponseExt trait and TypedResponseStream adapter Mar 26, 2026
codescene-delta-analysis[bot]

This comment was marked as outdated.

Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

♻️ Duplicate comments (1)
docs/execplans/11-3-1-helpers-for-consuming-streaming-responses.md (1)

358-363: ⚠️ Potential issue | 🟡 Minor

Fix list punctuation in the Stage B behavioural coverage bullets.

Replace the semicolon-plus-conjunction pattern at Line 359 with a full stop so the two bullets read as parallel items without carry-over punctuation.

✍️ Proposed fix
 Add behavioural coverage by extending the existing client streaming BDD suite:
 
 - extend `tests/features/client_streaming.feature` with one scenario proving
-  typed row consumption and one scenario proving control-frame skipping; and
+  typed row consumption and one scenario proving control-frame skipping.
 - update `tests/fixtures/client_streaming.rs`,
   `tests/steps/client_streaming_steps.rs`, and
   `tests/scenarios/client_streaming_scenarios.rs` so the world calls the new
   public helper directly.

As per coding guidelines, Markdown documentation must follow markdownlint-aligned style and clear list formatting rules.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@docs/execplans/11-3-1-helpers-for-consuming-streaming-responses.md` around
lines 358 - 363, The two Stage B behavioural coverage bullets (the ones starting
"extend `tests/features/client_streaming.feature` with one scenario..." and
"update `tests/fixtures/client_streaming.rs`,
`tests/steps/client_streaming_steps.rs`, and
`tests/scenarios/client_streaming_scenarios.rs` so the world calls the new
public helper directly") use a semicolon-plus-conjunction pattern; change the
semicolon and following conjunction to a full stop so each bullet is a separate,
parallel sentence (i.e., end the first bullet with a period and ensure the
second bullet begins as its own sentence).
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@docs/execplans/11-3-1-helpers-for-consuming-streaming-responses.md`:
- Around line 119-120: In the sentence "adapting a stream of protocol frames
into a stream of domain items while optionally skipping control frames" add a
comma after "items" (i.e., change to "...stream of domain items, while
optionally skipping control frames") to separate the main clause from the
subordinate clause; update that phrase in
docs/execplans/11-3-1-helpers-for-consuming-streaming-responses.md.
- Around line 405-406: Edit the sentence containing the phrase "streaming
section so the design doc reflects the new recommended consumption pattern
instead of only the manual `next()` loop." and insert a comma before "so" so it
reads "streaming section, so the design doc reflects the new recommended
consumption pattern instead of only the manual `next()` loop." to properly
separate the two independent clauses.
- Line 43: Replace the first occurrence of the acronym "BDD" in the phrase
"behavioural tests written with `rstest-bdd` v0.5.0 prove the helper against" by
expanding it to "Behaviour-driven development (BDD)" so the acronym is defined
on first use, and remove the later redundant expansion at the existing second
occurrence (currently around the text that expands BDD) to avoid duplication;
target the sentence containing "behavioural tests written with `rstest-bdd`
v0.5.0" and the later sentence that currently expands "Behaviour-driven
development (BDD)".

---

Duplicate comments:
In `@docs/execplans/11-3-1-helpers-for-consuming-streaming-responses.md`:
- Around line 358-363: The two Stage B behavioural coverage bullets (the ones
starting "extend `tests/features/client_streaming.feature` with one scenario..."
and "update `tests/fixtures/client_streaming.rs`,
`tests/steps/client_streaming_steps.rs`, and
`tests/scenarios/client_streaming_scenarios.rs` so the world calls the new
public helper directly") use a semicolon-plus-conjunction pattern; change the
semicolon and following conjunction to a full stop so each bullet is a separate,
parallel sentence (i.e., end the first bullet with a period and ensure the
second bullet begins as its own sentence).
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: ASSERTIVE

Plan: Pro

Run ID: 7e7ee3d9-266b-465b-b590-17b8a21aa80d

📥 Commits

Reviewing files that changed from the base of the PR and between c66ad6c and 6192510.

📒 Files selected for processing (3)
  • docs/execplans/11-3-1-helpers-for-consuming-streaming-responses.md
  • src/client/streaming_helpers.rs
  • tests/fixtures/client_streaming.rs

Comment thread docs/execplans/11-3-1-helpers-for-consuming-streaming-responses.md Outdated
Comment thread docs/execplans/11-3-1-helpers-for-consuming-streaming-responses.md Outdated
Comment thread docs/execplans/11-3-1-helpers-for-consuming-streaming-responses.md Outdated
Refined wording and formatting in 11-3-1-helpers-for-consuming-streaming-responses.md to enhance readability and precision. Changes include rephrasing to use "behaviour-driven development (BDD)" instead of "behavioural tests", improved punctuation and grammar for better clarity, and consistency updates in descriptions of testing practices and design documentation references.

Co-authored-by: devboxerhub[bot] <devboxerhub[bot]@users.noreply.github.com>
@leynos leynos merged commit 1380bd3 into main Mar 27, 2026
6 checks passed
@leynos leynos deleted the add-streaming-response-helpers-jxq7p8 branch March 27, 2026 12:13
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant