Skip to content

Implement outbound streaming send_streaming with tests (8.4.1–8.4.5)#485

Merged
leynos merged 7 commits intomainfrom
implement-send-streaming-helper-c91skz
Mar 1, 2026
Merged

Implement outbound streaming send_streaming with tests (8.4.1–8.4.5)#485
leynos merged 7 commits intomainfrom
implement-send-streaming-helper-c91skz

Conversation

@leynos
Copy link
Copy Markdown
Owner

@leynos leynos commented Feb 26, 2026

Summary

  • Implements the outbound streaming transport helper send_streaming with configurable chunking and timeout, expanded to cover roadmap items 8.4.1–8.4.5. Includes unit tests, test infrastructure, and Behavioural (BDD) tests scaffolding. Updates ADR, docs, and roadmap to reflect the new capability and testing approach.

Changes

  • Core implementation
    • Add src/client/send_streaming.rs implementing:
      • SendStreamingConfig (chunk_size, timeout) and SendStreamingOutcome (frames_sent).
      • The send_streaming method on WireframeClient, which streams a raw AsyncRead body into framed chunks prefixed by a protocol header. It derives an effective chunk size from max_frame_length and header length when chunk_size is not provided.
      • Timeout support wrapping the entire operation via tokio::time::timeout; on timeout, a TimedOut error is returned and no partial frame is emitted. All failure paths invoke the error hook.
      • Internal send_streaming_inner loop that reads chunks, builds frames (header + body), and writes frames to the transport with error hook integration.
      • Helper effective_chunk_size(header_len, max_frame_length, config) to validate header vs. frame budget.
    • WireframeClient wiring
      • Expose new types publicly: SendStreamingConfig, SendStreamingOutcome via pub use in src/client/mod.rs and register mod send_streaming;
      • The new module is part of the inbound/outbound symmetry with existing streaming logic.
  • API surface
    • Public exports updated to include:
      • SendStreamingConfig
      • SendStreamingOutcome
  • Tests
    • Unit tests and infra
      • Add src/client/tests/send_streaming.rs (unit tests for core behaviours)
      • Add src/client/tests/send_streaming_infra.rs (test infrastructure)
      • Extend test module wiring to include the new tests
    • Behavioural tests (BDD)
      • Add tests/features/client_send_streaming.feature (BDD scenarios)
      • Add tests/fixtures/client_send_streaming.rs (world and helpers)
      • Add tests/scenarios/client_send_streaming_scenarios.rs
      • Add tests/steps/client_send_streaming_steps.rs (step definitions)
      • Wire up modules in tests/fixtures/mod.rs, tests/steps/mod.rs, tests/scenarios/mod.rs
  • Docs and planning
    • docs/execplans/8-4-1-send-streaming-helper.md: new ExecPlan detailing approach (8.4.1)
    • docs/adr-002-streaming-requests-and-shared-message-assembly.md: update with implementation decisions for 8.4.1–8.4.5
    • docs/roadmap.md: mark 8.4.1–8.4.5 done
    • docs/users-guide.md: add a section “Outbound streaming sends” with usage example and semantics
  • Notes for reviewers
    • Core logic focus: effective_chunk_size calculation, timeout handling, and the inner streaming loop.
    • Ensure error hook is invoked on all failure paths, including transport errors and timeouts.
    • Confirm no public API signature changes to existing methods beyond introducing new module and exports.
  • Migration & compatibility
    • No changes to existing public APIs beyond adding new exports for SendStreamingConfig and SendStreamingOutcome. No breaking changes.
  • Examples
    • See usage example in docs/users-guide.md.

Why this is needed

Roadmap item 8.4.1–8.4.5 introduces outbound streaming for large request bodies. The new helper provides a transport-level, header-aware streaming primitive that reads from an AsyncRead source, chunks data according to protocol budgets, prefixes frames with protocol headers, handles timeouts, and integrates with the existing error hooks. This avoids boilerplate in protocol implementations and aligns outbound streaming with inbound streaming design.

How to test

  • Unit tests: cargo test --lib send_streaming
  • Behavioural tests (BDD): cargo test --test bdd --all-features client_send_streaming
  • Quick verification: Run fmt, lint, and tests as part of gates (make fmt, make lint, make test).

Notes for reviewers

  • Ensure error hook is invoked on all failure paths, including transport errors and timeouts.
  • Confirm no public API signature changes to existing methods beyond introducing new module and exports.
  • Review test infra for clean coverage of frame receipts and timeout scenarios without flakiness.

Migration & compatibility

  • No changes to existing public APIs beyond adding new exports for SendStreamingConfig and SendStreamingOutcome. No breaking changes.

Examples

use wireframe::client::{SendStreamingConfig, WireframeClient};

let header: &[u8] = &[0xCA, 0xFE, 0xBA, 0xBE];
let mut client = WireframeClient::builder().connect(addr).await?;
let body = vec![0u8; 4096];
let cfg = SendStreamingConfig::default().with_chunk_size(512).with_timeout(std::time::Duration::from_secs(5));
let outcome = client.send_streaming(header, &body, cfg).await?;
println!("sent {} frames", outcome.frames_sent());

◳ Generated by DevBoxer ◰


ℹ️ Tag @devboxerhub to ask questions and address PR feedback

📎 Task: https://www.devboxer.com/task/cf7e7719-c829-4cff-a1e1-38df5af2237f

Summary by Sourcery

Add outbound streaming send support to the client with configuration, tests, and documentation updates.

New Features:

  • Introduce SendStreamingConfig and SendStreamingOutcome types and a send_streaming API on WireframeClient to send large request bodies as framed chunks with optional timeout control.

Enhancements:

  • Wire up the new send_streaming module into the client public API and integrate it symmetrically with existing streaming components.
  • Document outbound streaming sends in the user guide and capture implementation decisions and execution planning for roadmap items 8.4.1–8.4.5 in ADR and exec plan docs.
  • Mark roadmap items 8.4.1–8.4.5 as complete for the outbound streaming transport helper.

Documentation:

  • Extend the user guide with an "Outbound streaming sends" section including usage examples and semantics for configuration and outcomes.

Tests:

  • Add unit tests and supporting infrastructure for send_streaming covering chunking, header sizing, timeouts, error-hook invocation, and outcome reporting.
  • Add BDD scenarios, fixtures, and step definitions to exercise client outbound streaming behaviour end-to-end.

@sourcery-ai
Copy link
Copy Markdown
Contributor

sourcery-ai Bot commented Feb 26, 2026

Reviewer's Guide

Implements a new outbound streaming helper WireframeClient::send_streaming with configurable chunking and timeout, plus tests, BDD scaffolding, and docs/ADR/roadmap updates to integrate it into the client API and testing strategy.

Sequence diagram for WireframeClient::send_streaming with timeout and error hook

sequenceDiagram
    actor Caller
    participant WireframeClient
    participant BodyReader
    participant TokioTimeout
    participant TransportFramed
    participant ErrorHook

    Caller->>WireframeClient: send_streaming(frame_header, body_reader, config)
    WireframeClient->>WireframeClient: effective_chunk_size(header_len, max_frame_length, config)
    alt invalid_header_or_config
        WireframeClient-->>Caller: Err(ClientError)
    else valid_config
        alt config.timeout is Some
            WireframeClient->>TokioTimeout: timeout(duration, send_streaming_inner(...))
            note over TokioTimeout,WireframeClient: Tokio runs send_streaming_inner under timeout
        else no_timeout
            WireframeClient->>WireframeClient: send_streaming_inner(frame_header, body_reader, chunk_size)
        end

        loop for_each_frame
            WireframeClient->>BodyReader: read(chunk_size)
            alt read_ok_and_n_gt_0
                WireframeClient->>TransportFramed: send(Bytes{header + chunk})
                alt transport_error
                    TransportFramed-->>WireframeClient: Err(io_error)
                    WireframeClient->>ErrorHook: invoke_error_hook(ClientError)
                    WireframeClient-->>Caller: Err(ClientError)
                else transport_ok
                    TransportFramed-->>WireframeClient: Ok(())
                end
            else read_eof
                WireframeClient-->>Caller: Ok(SendStreamingOutcome{frames_sent})
            else read_error
                BodyReader-->>WireframeClient: Err(io_error)
                WireframeClient->>ErrorHook: invoke_error_hook(ClientError)
                WireframeClient-->>Caller: Err(ClientError)
            end
        end

        opt timeout_configured
            alt timeout_elapsed_before_completion
                TokioTimeout-->>WireframeClient: Elapsed
                WireframeClient->>ErrorHook: invoke_error_hook(ClientError{TimedOut})
                WireframeClient-->>Caller: Err(ClientError{TimedOut})
            else completed_before_timeout
                TokioTimeout-->>WireframeClient: Ok(Ok(SendStreamingOutcome))
                WireframeClient-->>Caller: Ok(SendStreamingOutcome)
            end
        end
    end
Loading

Class diagram for SendStreamingConfig, SendStreamingOutcome, and WireframeClient::send_streaming

classDiagram
    class WireframeClient {
        +send_streaming(frame_header: slice_u8, body_reader: AsyncRead, config: SendStreamingConfig) Result~SendStreamingOutcome, ClientError~
        -send_streaming_inner(frame_header: slice_u8, body_reader: AsyncRead, chunk_size: usize) Result~SendStreamingOutcome, ClientError~
        -invoke_error_hook(error: ClientError) void
        -codec_config: ClientCodecConfig
    }

    class SendStreamingConfig {
        -chunk_size: Option~usize~
        -timeout: Option~Duration~
        +with_chunk_size(size: usize) SendStreamingConfig
        +with_timeout(duration: Duration) SendStreamingConfig
        +chunk_size() Option~usize~
        +timeout() Option~Duration~
    }

    class SendStreamingOutcome {
        -frames_sent: u64
        +new(frames_sent: u64) SendStreamingOutcome
        +frames_sent() u64
    }

    class ClientCodecConfig {
        +max_frame_length_value() usize
    }

    class ClientError

    class SendStreamingModule {
        +effective_chunk_size(header_len: usize, max_frame_length: usize, config: SendStreamingConfig) Result~usize, ClientError~
    }

    WireframeClient --> SendStreamingConfig : uses
    WireframeClient --> SendStreamingOutcome : returns
    WireframeClient --> ClientCodecConfig : reads_max_frame_length
    WireframeClient --> ClientError : error_type
    SendStreamingModule --> SendStreamingConfig : reads
    SendStreamingModule --> ClientError : returns
Loading

File-Level Changes

Change Details Files
Add SendStreamingConfig, SendStreamingOutcome, and WireframeClient::send_streaming implementation for outbound streaming sends.
  • Introduce SendStreamingConfig with optional chunk_size and timeout plus builder-style accessors and getters.
  • Introduce SendStreamingOutcome with frame count reporting and a constructor.
  • Implement WireframeClient::send_streaming that chunks an AsyncRead body, prefixes each chunk with a caller-provided header, and writes frames via the existing framed transport.
  • Wrap the core send loop in an optional tokio::time::timeout, mapping elapsed to a TimedOut ClientError and ensuring the error hook is invoked.
  • Factor the internal streaming loop into send_streaming_inner and use an effective_chunk_size helper to compute and validate body budget per frame, clamping configured chunk sizes and rejecting headers that exceed frame capacity.
src/client/send_streaming.rs
src/client/mod.rs
Extend unit test coverage and infra for outbound streaming send behaviour.
  • Add send_streaming unit tests covering chunking, boundaries, empty bodies, header-size validation, chunk-size clamping, timeout behaviour, error-hook invocation, and outcome reporting.
  • Provide send_streaming_infra helpers to spin up receiving and dropping servers, construct clients with specific codec limits and error hooks, and generate protocol headers and test bodies.
  • Wire the new tests into the existing client test module tree.
src/client/tests/send_streaming.rs
src/client/tests/send_streaming_infra.rs
src/client/tests/mod.rs
Add BDD scenarios and fixtures for outbound streaming send flows.
  • Define Gherkin feature scenarios for multi-chunk sends, empty sends, timeouts, and transport failures in client outbound streaming.
  • Implement ClientSendStreamingWorld fixture with runtime management, server start/stop helpers, client connection, send_streaming invocations (including blocking readers and timeouts), and verification helpers for frames and error types.
  • Add step definitions that bind send-streaming-specific Gherkin steps to world methods, including shared helper for starting servers and connecting clients.
  • Register new BDD scenarios module to run the feature tests.
tests/features/client_send_streaming.feature
tests/fixtures/client_send_streaming.rs
tests/steps/client_send_streaming_steps.rs
tests/scenarios/client_send_streaming_scenarios.rs
tests/fixtures/mod.rs
tests/steps/mod.rs
tests/scenarios/mod.rs
Update documentation and planning artefacts for outbound streaming sends and roadmap completion.
  • Add an execution plan document detailing scope, risks, plan, and implementation notes for send_streaming (8.4.1–8.4.5).
  • Extend ADR 0002 with concrete implementation decisions: module location, config struct, timeout strategy, error propagation, and raw-byte header semantics for send_streaming.
  • Document outbound streaming sends in the users guide, with a code example, configuration knobs, timeout semantics, and SendStreamingOutcome description.
  • Mark roadmap items 8.4.1–8.4.5 as completed.
docs/execplans/8-4-1-send-streaming-helper.md
docs/adr-002-streaming-requests-and-shared-message-assembly.md
docs/users-guide.md
docs/roadmap.md

Tips and commands

Interacting with Sourcery

  • Trigger a new review: Comment @sourcery-ai review on the pull request.
  • Continue discussions: Reply directly to Sourcery's review comments.
  • Generate a GitHub issue from a review comment: Ask Sourcery to create an
    issue from a review comment by replying to it. You can also reply to a
    review comment with @sourcery-ai issue to create an issue from it.
  • Generate a pull request title: Write @sourcery-ai anywhere in the pull
    request title to generate a title at any time. You can also comment
    @sourcery-ai title on the pull request to (re-)generate the title at any time.
  • Generate a pull request summary: Write @sourcery-ai summary anywhere in
    the pull request body to generate a PR summary at any time exactly where you
    want it. You can also comment @sourcery-ai summary on the pull request to
    (re-)generate the summary at any time.
  • Generate reviewer's guide: Comment @sourcery-ai guide on the pull
    request to (re-)generate the reviewer's guide at any time.
  • Resolve all Sourcery comments: Comment @sourcery-ai resolve on the
    pull request to resolve all Sourcery comments. Useful if you've already
    addressed all the comments and don't want to see them anymore.
  • Dismiss all Sourcery reviews: Comment @sourcery-ai dismiss on the pull
    request to dismiss all existing Sourcery reviews. Especially useful if you
    want to start fresh with a new review - don't forget to comment
    @sourcery-ai review to trigger a new review!

Customizing Your Experience

Access your dashboard to:

  • Enable or disable review features such as the Sourcery-generated pull request
    summary, the reviewer's guide, and others.
  • Change the review language.
  • Add, remove or edit custom review instructions.
  • Adjust other review settings.

Getting Help

@coderabbitai
Copy link
Copy Markdown
Contributor

coderabbitai Bot commented Feb 26, 2026

Summary by CodeRabbit

  • New Features

    • Outbound streaming for large request bodies with configurable chunk sizes and a single-operation timeout; reports frames sent and avoids emitting partial frames on timeout.
  • Documentation

    • User guide, roadmap and ADR/exec-plan added/updated with streaming design, examples and rollout plan.
  • Tests

    • Extensive unit, integration and feature tests plus test fixtures and scenario steps covering framing, boundaries, timeouts and transport failures.

Walkthrough

Add an outbound transport streaming helper: public SendStreamingConfig, SendStreamingOutcome, and WireframeClient::send_streaming that chunk an AsyncRead body into framed writes with an overall timeout; include docs, exec plan, user-guide, unit tests, BDD fixtures, steps and test infra.

Changes

Cohort / File(s) Summary
Documentation & Roadmap
docs/adr-002-streaming-requests-and-shared-message-assembly.md, docs/execplans/8-4-1-send-streaming-helper.md, docs/roadmap.md, docs/users-guide.md
Add ADR and exec plan for outbound streaming, mark roadmap tasks done, and add user-guide section describing API, semantics and examples.
Public API Re-exports
src/client/mod.rs
Re-export SendStreamingConfig and SendStreamingOutcome from new send_streaming module.
Core Implementation
src/client/send_streaming.rs
Add SendStreamingConfig (chunk_size, timeout) and SendStreamingOutcome (frames_sent); implement WireframeClient::send_streaming, internal send_streaming_inner, effective_chunk_size, read/chunk loop, overall-timeout semantics and error mapping (timeout → ClientError::Wireframe(WireframeError::Io(TimedOut))).
Unit Tests & Test Infra
src/client/tests/send_streaming.rs, src/client/tests/send_streaming_infra.rs, src/client/tests/mod.rs
Add comprehensive unit tests for framing, chunking, header semantics, timeout and transport-failure; add test infra (receiving/dropping servers, client factories, blocking reader).
BDD Features & Fixtures
tests/features/client_send_streaming.feature, tests/fixtures/client_send_streaming.rs, tests/fixtures/mod.rs
Add Gherkin feature for multi-chunk/zero-byte/timeout/transport-failure cases and ClientSendStreamingWorld fixture to orchestrate runtime, servers and client.
BDD Scenarios & Steps
tests/scenarios/client_send_streaming_scenarios.rs, tests/scenarios/mod.rs, tests/steps/client_send_streaming_steps.rs, tests/steps/mod.rs
Add scenario bindings and step definitions to run feature tests and assert server-received frames and error conditions.

Sequence Diagram(s)

sequenceDiagram
    participant Client as WireframeClient
    participant SendStream as send_streaming
    participant Reader as AsyncRead\n(Body)
    participant Framing as FrameConstructor
    participant Transport as ClientStream\n(Network)

    Client->>SendStream: send_streaming(header, reader, config)
    Note over SendStream: Start overall timeout (if configured)
    loop read until EOF or error
        SendStream->>Reader: read(effective_chunk_size)
        alt read timed out
            Reader-->>SendStream: TimedOut
            SendStream->>SendStream: Abort, do not emit partial frame
            SendStream-->>Client: ClientError::Wireframe(WireframeError::Io(TimedOut))
        else read ok
            Reader-->>SendStream: chunk_bytes
            SendStream->>Framing: prepend header -> framed_payload
            Framing-->>SendStream: framed_payload
            SendStream->>Transport: send(framed_payload)
            alt transport error
                Transport-->>SendStream: Io error
                SendStream-->>Client: ClientError (frames_sent so far)
            else success
                Transport-->>SendStream: OK
                SendStream->>SendStream: increment frames_sent
            end
        end
    end
    SendStream-->>Client: SendStreamingOutcome { frames_sent }
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~45 minutes

Possibly related PRs

Poem

Flow the bytes in headered streams,
Count the frames and guard your dreams,
Timeouts halt before a shard,
Tests and docs keep measure hard,
New helper sings where transports gleam.

🚥 Pre-merge checks | ✅ 3
✅ Passed checks (3 passed)
Check name Status Explanation
Title check ✅ Passed The title clearly summarises the main feature (outbound streaming send_streaming) and roadmap items implemented (8.4.1–8.4.5).
Description check ✅ Passed The description comprehensively details the implementation, changes, tests, docs, and rationale for the outbound streaming transport helper.
Docstring Coverage ✅ Passed Docstring coverage is 100.00% which is sufficient. The required threshold is 80.00%.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch implement-send-streaming-helper-c91skz

Comment @coderabbitai help to get the list of available commands and usage tips.

codescene-delta-analysis[bot]

This comment was marked as outdated.

@leynos
Copy link
Copy Markdown
Owner Author

leynos commented Feb 27, 2026

@coderabbitai Please suggest a fix for this issue and supply a prompt for an AI coding agent to enable it to apply the fix:

tests/steps/client_send_streaming_steps.rs

Comment on file

fn given_receiving_server(
    client_send_streaming_world: &mut ClientSendStreamingWorld,
) -> TestResult {
    client_send_streaming_world.block_on(|w| {
        Box::pin(async {
            w.start_receiving_server().await?;
            w.connect_client().await
        })
    })?
}

❌ New issue: Code Duplication
The module contains 2 functions with similar structure: given_dropping_server,given_receiving_server

@leynos
Copy link
Copy Markdown
Owner Author

leynos commented Feb 27, 2026

@coderabbitai Please suggest a fix for this issue and supply a prompt for an AI coding agent to enable it to apply the fix:

src/client/tests/send_streaming_infra.rs

Comment on file

pub(super) async fn create_send_client(
    addr: SocketAddr,
) -> Result<
    WireframeClient<
        crate::serializer::BincodeSerializer,
        crate::rewind_stream::RewindStream<tokio::net::TcpStream>,
    >,
    Box<dyn std::error::Error + Send + Sync>,
> {
    Ok(WireframeClient::builder().connect(addr).await?)
}

❌ New issue: Code Duplication
The module contains 2 functions with similar structure: create_send_client,create_send_client_with_max_frame

@leynos
Copy link
Copy Markdown
Owner Author

leynos commented Feb 27, 2026

@coderabbitai Please suggest a fix for this issue and supply a prompt for an AI coding agent to enable it to apply the fix:

src/client/tests/send_streaming.rs

Comment on file

async fn exact_chunk_boundary_produces_single_frame(protocol_header: Vec<u8>) -> TestResult {
    let (server, frames) = spawn_receiving_server().await?;
    let mut client = create_send_client(server.addr).await?;

    let body = test_body(100);
    let config = SendStreamingConfig::default().with_chunk_size(100);

    let outcome = client
        .send_streaming(&protocol_header, &body[..], config)
        .await?;

    if outcome.frames_sent() != 1 {
        return Err(format!(
            "exactly chunk_size bytes should produce 1 frame, got {}",
            outcome.frames_sent()
        )
        .into());
    }

    drop(client);
    tokio::time::sleep(Duration::from_millis(50)).await;

    let received = frames.lock().await;
    if received.len() != 1 {
        return Err(format!("expected 1 frame, got {}", received.len()).into());
    }

    Ok(())
}

❌ New issue: Code Duplication
The module contains 2 functions with similar structure: empty_body_sends_zero_frames,exact_chunk_boundary_produces_single_frame

@coderabbitai

This comment was marked as resolved.

@coderabbitai

This comment was marked as resolved.

@coderabbitai

This comment was marked as resolved.

@leynos leynos changed the title Add outbound streaming helper send_streaming with tests Implement outbound streaming send_streaming with tests (8.4.1–8.4.5) Feb 27, 2026
@leynos leynos marked this pull request as ready for review February 27, 2026 17:25
sourcery-ai[bot]

This comment was marked as resolved.

@coderabbitai coderabbitai Bot added the codex label Feb 27, 2026
Copy link
Copy Markdown

@chatgpt-codex-connector chatgpt-codex-connector Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: 9248745288

ℹ️ About Codex in GitHub

Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".

Comment thread src/client/send_streaming.rs Outdated
Comment thread src/client/send_streaming.rs Outdated
Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 9

🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@docs/adr-002-streaming-requests-and-shared-message-assembly.md`:
- Around line 369-370: Update the doc text about SendStreamingOutcome to remove
the claim that it reports progress on failure; instead state that
SendStreamingOutcome reports frames_sent: u64 only on successful completion, and
that callers cannot rely on this type for failure-path progress visibility.
Replace the existing sentence referencing `SendStreamingOutcome` and
`frames_sent: u64` with this corrected wording.

In `@docs/execplans/8-4-1-send-streaming-helper.md`:
- Around line 389-392: Update the test instruction for
send_streaming_timeout_returns_timed_out to match the working approach: remove
the guidance to use tokio::time::pause() and instead instruct creating an
AsyncRead that blocks indefinitely (e.g., a tokio::io::duplex where no data is
ever written or the writer is dropped without writing), set a short timeout, and
assert the resulting error is io::ErrorKind::TimedOut; reference the test name
send_streaming_timeout_returns_timed_out and the types tokio::io::duplex,
AsyncRead, and io::ErrorKind::TimedOut so readers can find and implement the
proven pattern.
- Around line 119-224: Update the document to use the repository-mandated Oxford
en-GB spelling (en-GB-oxendict): replace all instances of "finalise" →
"finalize", "serialisation"/"serialised"/"serialisation" variants →
"serialization"/"serialized", and any other -ise suffixes to -ize forms (e.g.,
"realise"→"realize", "organisation"→"organization"); scan the file for these
tokens and update them consistently (headings, paragraphs, decision log entries,
ADR wording, and glossary-like phrases such as "serialisation layer") so all
occurrences in this document use the -ize spelling.

In `@src/client/send_streaming.rs`:
- Around line 186-190: The call to effective_chunk_size should handle Err
explicitly so the preflight error hook is invoked before returning: replace the
direct ?-propagation of effective_chunk_size(frame_header.len(),
self.codec_config.max_frame_length_value(), &config) with a match (or if let
Err) that on Err calls the configured preflight error hook (e.g.,
self.error_hook.preflight_failed(&err) or whatever the hook method is) and then
returns the error; on Ok continue and bind effective_chunk_size to the success
value.
- Around line 276-278: The match on config.chunk_size currently allows a
requested chunk size of 0 which leads to silent success; modify the logic around
the match on config.chunk_size (the branch that returns
requested.min(available)) to treat Some(0) as invalid: if requested == 0 return
an Err (or propagate an error) indicating an invalid chunk size instead of
returning 0, otherwise use requested.min(available); also add a validation in
the with_chunk_size setter or caller to reject zero up-front so callers cannot
set Some(0).

In `@src/client/tests/send_streaming_infra.rs`:
- Around line 78-85: The test currently drops the accepted socket (tcp) which
performs a graceful FIN close; instead call shutdown on the socket's write
direction to force immediate write failure: after accepting the socket in the
spawned task from listener.accept() call tcp.shutdown(std::net::Shutdown::Write)
(or the async equivalent) before dropping tcp so the client sees a deterministic
transport error; also remove the defensive
tokio::time::sleep(Duration::from_millis(50)) in
invokes_error_hook_on_transport_failure since shutdown makes the error occur on
the first write.

In `@tests/features/client_send_streaming.feature`:
- Line 18: Update the scenario title text from "Client send times out on a slow
body reader" to "Client send operation times out on a slow body reader" in the
feature file and make the matching change in the test definition in
client_send_streaming_scenarios.rs so the scenario name string/identifier in the
Rust test matches the new phrasing exactly.

In `@tests/fixtures/client_send_streaming.rs`:
- Around line 143-145: The Given step set_blocking_reader sets
use_blocking_reader but do_send_streaming_with_timeout doesn't currently honor
it; modify do_send_streaming_with_timeout to check the use_blocking_reader flag
at the start and fail fast (return an immediate error/result) if it's not true,
then proceed with the blocking-reader behavior only when true and reset
use_blocking_reader to false after consuming it; update the equivalent logic
referenced around the other Given step (lines 167-174) so both call sites read
and clear use_blocking_reader the same way.

In `@tests/steps/client_send_streaming_steps.rs`:
- Around line 47-55: Remove the #[expect(clippy::unnecessary_wraps,...)]
attribute and change the step function given_blocking_reader to be infallible:
update its signature to return () (i.e., no -> TestResult) and remove the Ok(())
return so the body simply calls
client_send_streaming_world.set_blocking_reader(); ensuring the function matches
other infallible step examples like then_end.

ℹ️ Review info

Configuration used: Organization UI

Review profile: ASSERTIVE

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between bfe299b and 9248745.

📒 Files selected for processing (16)
  • docs/adr-002-streaming-requests-and-shared-message-assembly.md
  • docs/execplans/8-4-1-send-streaming-helper.md
  • docs/roadmap.md
  • docs/users-guide.md
  • src/client/mod.rs
  • src/client/send_streaming.rs
  • src/client/tests/mod.rs
  • src/client/tests/send_streaming.rs
  • src/client/tests/send_streaming_infra.rs
  • tests/features/client_send_streaming.feature
  • tests/fixtures/client_send_streaming.rs
  • tests/fixtures/mod.rs
  • tests/scenarios/client_send_streaming_scenarios.rs
  • tests/scenarios/mod.rs
  • tests/steps/client_send_streaming_steps.rs
  • tests/steps/mod.rs

Comment thread docs/adr-002-streaming-requests-and-shared-message-assembly.md Outdated
Comment thread docs/execplans/8-4-1-send-streaming-helper.md Outdated
Comment thread docs/execplans/8-4-1-send-streaming-helper.md Outdated
Comment thread src/client/send_streaming.rs Outdated
Comment thread src/client/send_streaming.rs Outdated
Comment thread src/client/tests/send_streaming_infra.rs
Comment thread tests/features/client_send_streaming.feature Outdated
Comment thread tests/fixtures/client_send_streaming.rs Outdated
Comment thread tests/steps/client_send_streaming_steps.rs Outdated
leynos and others added 2 commits February 27, 2026 22:07
Introduce `send_streaming` method on `WireframeClient` to support sending large request bodies as multiple framed chunks. This method reads from an async source, prepends a protocol-defined header to each chunk, and sends framed packets over the transport.

Chunk size can be configured or auto-derived from the max frame length minus header size. The entire operation can be wrapped with a configurable timeout that returns a `TimedOut` error without emitting partial frames.

Integrate error hook invocation on all failure paths. Return `SendStreamingOutcome` reporting frames sent for partial success awareness.

Add comprehensive unit and BDD tests covering chunking, timeout, error handling, and frame content correctness.

Update documentation (ADR 0002, users-guide, roadmap) to describe new functionality and mark roadmap items 8.4.1-8.4.5 as complete.

Co-authored-by: devboxerhub[bot] <devboxerhub[bot]@users.noreply.github.com>
…server startup

- Introduce `test_frames_sent_for_body` helper to verify frame counts sent and received.
- Refactor existing tests to use this helper, reducing duplication.
- Update test infrastructure to use common max frame size.
- Abstract server start-and-connect logic in BDD steps to a helper fn.
- Simplify BDD step implementations using the new helper.
- Clean up documentation wording on async server task design and clippy lint usage.

Co-authored-by: devboxerhub[bot] <devboxerhub[bot]@users.noreply.github.com>
@devboxerhub devboxerhub Bot force-pushed the implement-send-streaming-helper-c91skz branch from 9248745 to 6b0c2cb Compare February 27, 2026 22:09
coderabbitai[bot]

This comment was marked as resolved.

…st coverage

- Add proper error hook invocation on chunk size calculation failure.
- Shutdown write side on timeout to prevent codec state leakage.
- Refactor chunk read logic to robustly handle EOF and errors.
- Add validation rejecting zero chunk sizes.
- Create a blocking_reader helper to reliably test timeouts.
- Update tests to use blocking_reader for deterministic behavior.
- Strengthen tests for error conditions and chunk/frame correctness.
- Improve documentation and clarify send_streaming outcome semantics.
- Minor docs and test scenario wording improvements for clarity.

Co-authored-by: devboxerhub[bot] <devboxerhub[bot]@users.noreply.github.com>
codescene-delta-analysis[bot]

This comment was marked as outdated.

Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

♻️ Duplicate comments (1)
docs/execplans/8-4-1-send-streaming-helper.md (1)

102-105: ⚠️ Potential issue | 🟡 Minor

Update the timeout-risk mitigation to match the implemented method.

Replace this mitigation text. Line 103 still prescribes tokio::time::pause(), but the same document records that this approach failed and was replaced.

🛠️ Proposed fix
-- Risk: unit tests for timeout behaviour can be flaky with real time.
-  Severity: medium Likelihood: medium Mitigation: use `tokio::time::pause()`
-  for deterministic virtual time in timeout tests.
+- Risk: unit tests for timeout behaviour can be flaky with real time.
+  Severity: medium Likelihood: medium Mitigation: use an `mpsc`-backed
+  blocking `AsyncRead` (for example `tokio_util::io::StreamReader`) with a
+  short wall-clock timeout, and avoid `tokio::time::pause()` with
+  `tokio::io::duplex` for this path.

As per coding guidelines, "Use markdown files within the docs/ directory as a knowledge base and source of truth for project requirements, dependency choices, and architectural decisions... Ensure documentation remains accurate and current."

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@docs/execplans/8-4-1-send-streaming-helper.md` around lines 102 - 105,
Replace the incorrect mitigation that recommends tokio::time::pause() with the
actual deterministic virtual-time approach used in our tests: remove the
explicit tokio::time::pause() suggestion and instead describe the project's
implemented helper/strategy for virtualizing time (name that helper or test
utility used in the codebase) so the mitigation matches reality and points
readers to the correct test helper/implementation.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@src/client/tests/send_streaming.rs`:
- Around line 54-55: The test uses fixed sleeps (drop(client);
tokio::time::sleep(Duration::from_millis(50)).await;) which causes timing races;
replace each fixed sleep with an explicit readiness check or awaiting the
server/test task instead — for example, await a server handle future
(server_handle.await) or poll a shared counter/AtomicUsize or oneshot receiver
in a loop with tokio::time::timeout until the expected frame count is reached,
then assert; update the occurrences around drop(client) and the other locations
(near lines 98–99, 157–158, 205–206, 288–289) to use the same condition-based
synchronization instead of sleep.

In `@tests/fixtures/client_send_streaming.rs`:
- Around line 156-158: The test duplicates the cyclic byte generation used by
the existing helper test_body
(src/client/tests/send_streaming_infra.rs::test_body); replace the inline body
creation (the local body: Vec<u8> computed from 0..body_size) with a call to the
shared helper or move the helper into a common test util and call that from
tests/fixtures/client_send_streaming.rs so both tests reuse the same function
(reference test_body) to remove duplication.

---

Duplicate comments:
In `@docs/execplans/8-4-1-send-streaming-helper.md`:
- Around line 102-105: Replace the incorrect mitigation that recommends
tokio::time::pause() with the actual deterministic virtual-time approach used in
our tests: remove the explicit tokio::time::pause() suggestion and instead
describe the project's implemented helper/strategy for virtualizing time (name
that helper or test utility used in the codebase) so the mitigation matches
reality and points readers to the correct test helper/implementation.

ℹ️ Review info

Configuration used: Organization UI

Review profile: ASSERTIVE

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 6b0c2cb and 2a325d2.

📒 Files selected for processing (10)
  • docs/adr-002-streaming-requests-and-shared-message-assembly.md
  • docs/execplans/8-4-1-send-streaming-helper.md
  • docs/users-guide.md
  • src/client/send_streaming.rs
  • src/client/tests/send_streaming.rs
  • src/client/tests/send_streaming_infra.rs
  • tests/features/client_send_streaming.feature
  • tests/fixtures/client_send_streaming.rs
  • tests/scenarios/client_send_streaming_scenarios.rs
  • tests/steps/client_send_streaming_steps.rs

Comment thread src/client/tests/send_streaming.rs Outdated
Comment thread tests/fixtures/client_send_streaming.rs Outdated
@leynos
Copy link
Copy Markdown
Owner Author

leynos commented Feb 28, 2026

@coderabbitai Have the following now been resolved?

Please address the comments from this code review:

## Overall Comments
- ADR 0002 still states that timeouts MUST not emit partial frames, while the new ExecPlan and users guide text explicitly allow partial progress on timeout; please reconcile these documents (and, if needed, the implementation) so the timeout/partial-frame semantics are clearly consistent in one direction.
- In the BDD world (`ClientSendStreamingWorld`), the `use_blocking_reader` flag and `set_blocking_reader` are never read, so either wire that flag into the send path or remove the dead field/method to keep the fixture focused.
- Inside `send_streaming_inner`, `frame.extend_from_slice(buf.get(..n).unwrap_or(&buf));` can be simplified to using the guaranteed-in-bounds slice (e.g. `&buf[..n]`), which avoids the unnecessary `get(..).unwrap_or` branch for a clearer hot path.

## Individual Comments

### Comment 1
<location path="src/client/send_streaming.rs" line_range="238-240" />
<code_context>
+
+            let mut frame = Vec::with_capacity(header_len + n);
+            frame.extend_from_slice(frame_header);
+            frame.extend_from_slice(buf.get(..n).unwrap_or(&buf));
+
+            if let Err(e) = self.framed.send(Bytes::from(frame)).await {
</code_context>
<issue_to_address>
**suggestion (bug_risk):** Avoid `get(..n).unwrap_or(&buf)` and slice the buffer directly with `&buf[..n]`.

`read(&mut buf)` guarantees `n <= buf.len()`, so `buf.get(..n)` can’t be `None`. Using `get(..n).unwrap_or(&buf)` hides this invariant and, if ever violated, would send the entire buffer. Slicing directly (`frame.extend_from_slice(&buf[..n]);`) is clearer and avoids this risky fallback.

```suggestion
            let mut frame = Vec::with_capacity(header_len + n);
            frame.extend_from_slice(frame_header);
            frame.extend_from_slice(&buf[..n]);
```
</issue_to_address>

### Comment 2
<location path="tests/fixtures/client_send_streaming.rs" line_range="28-37" />
<code_context>
+    use_blocking_reader: bool,
</code_context>
<issue_to_address>
**issue (testing):** The `use_blocking_reader` flag is never used, making the corresponding step definition misleading

`ClientSendStreamingWorld` exposes `use_blocking_reader`, and the Given step "a send-streaming body reader that blocks indefinitely" sets it, but `do_send_streaming` / `do_send_streaming_with_timeout` never use it. That Given step currently has no effect. Please either use this flag in the send helpers (e.g. select a blocking vs normal `StreamReader`) or remove the flag and Given step to avoid misleading tests.
</issue_to_address>

### Comment 3
<location path="tests/steps/client_send_streaming_steps.rs" line_range="66-74" />
<code_context>
+    )
+}
+
+#[when(
+    "the client streams {body_size:usize} bytes with a {header_size:usize} byte header and \
+     {chunk_size:usize} byte chunks"
+)]
+fn when_send_streaming(
+    client_send_streaming_world: &mut ClientSendStreamingWorld,
+    body_size: usize,
+    _header_size: usize,
+    chunk_size: usize,
+) -> TestResult {
+    client_send_streaming_world
</code_context>
<issue_to_address>
**issue (testing):** The `header_size` parameter in the step is unused, so varying it in feature scenarios has no effect

The step takes `{header_size: usize}` but ignores it (`_header_size`), while the world always uses a fixed 4‑byte `protocol_header`. So changing `header_size` in scenarios has no effect. Either wire `header_size` through to `world.protocol_header` (e.g., resize the header) so tests actually vary header size, or remove `header_size` from the step text if it isn’t meant to be configurable in these tests.
</issue_to_address>

### Comment 4
<location path="src/client/send_streaming.rs" line_range="228" />
<code_context>
+        let mut frames_sent: u64 = 0;
+
+        loop {
+            let n = match body_reader.read(&mut buf).await {
+                Ok(0) => break,
+                Ok(n) => n,
</code_context>
<issue_to_address>
**suggestion (review_instructions):** This `match` has three branches and should be extracted into a predicate/helper function to comply with the multi-branch conditional rule.

The `match` on `body_reader.read(..)` has three arms (`Ok(0)`, `Ok(n)`, `Err(e)`), which falls under the guideline to move conditionals with more than two branches into a predicate function.

Consider extracting this into a small helper (e.g. a function that encapsulates the read and maps it into an enum or a simple result type), and then use that helper here to keep the main loop logic simpler and in line with the instruction.

<details>
<summary>Review instructions:</summary>

**Path patterns:** `**/*.rs`

**Instructions:**
Move conditionals with >2 branches into a predicate function.

</details>
</issue_to_address>

### Comment 5
<location path="docs/execplans/8-4-1-send-streaming-helper.md" line_range="3" />
<code_context>
+# Implement `send_streaming(frame_header, body_reader)` transport helper (8.4.1)
+
+This ExecPlan is a living document. The sections `Constraints`, `Tolerances`,
+`Risks`, `Progress`, `Surprises & Discoveries`, `Decision Log`, and
+`Outcomes & Retrospective` must be kept up to date as work proceeds.
</code_context>
<issue_to_address>
**suggestion (review_instructions):** "ExecPlan" is introduced as an acronym without being defined on first use.

To follow the documentation guidelines, spell out the term on first use and introduce the acronym, for example: "This execution plan (ExecPlan) is a living document." Subsequent uses can then just say "ExecPlan".

<details>
<summary>Review instructions:</summary>

**Path patterns:** `**/*.md`

**Instructions:**
Define uncommon acronyms on first use.

</details>
</issue_to_address>

### Comment 6
<location path="docs/execplans/8-4-1-send-streaming-helper.md" line_range="105" />
<code_context>
+  Severity: medium Likelihood: medium Mitigation: use `tokio::time::pause()`
+  for deterministic virtual time in timeout tests.
+
+- Risk: BDD step text could conflict with existing `client_streaming` steps.
+  Severity: low Likelihood: medium Mitigation: use a `send-streaming` step
+  prefix for all step definitions.
</code_context>
<issue_to_address>
**suggestion (review_instructions):** "BDD" is used without expanding the acronym on first use.

Consider expanding BDD the first time it appears, for example: "Behaviour-Driven Development (BDD) step text could conflict..." After that, using just "BDD" is fine.

<details>
<summary>Review instructions:</summary>

**Path patterns:** `**/*.md`

**Instructions:**
Define uncommon acronyms on first use.

</details>
</issue_to_address>

@coderabbitai
Copy link
Copy Markdown
Contributor

coderabbitai Bot commented Feb 28, 2026

Tip

For best results, initiate chat on the files or code changes.

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# Check src/client/send_streaming.rs for the buf slicing issue and match extraction
echo "=== Checking src/client/send_streaming.rs around line 238-240 for buf slicing ==="
cat src/client/send_streaming.rs | head -n 250 | tail -n 30

echo -e "\n=== Checking src/client/send_streaming.rs around line 228 for match statement ==="
cat src/client/send_streaming.rs | head -n 240 | tail -n 20

Length of output: 1840


🏁 Script executed:

#!/bin/bash
# Check tests/fixtures/client_send_streaming.rs for use_blocking_reader
echo "=== Checking tests/fixtures/client_send_streaming.rs for use_blocking_reader field ==="
rg -n "use_blocking_reader" tests/fixtures/client_send_streaming.rs

Length of output: 86


🏁 Script executed:

#!/bin/bash
# Check tests/steps/client_send_streaming_steps.rs for header_size parameter
echo "=== Checking tests/steps/client_send_streaming_steps.rs for header_size usage ==="
rg -n -A5 "when_send_streaming" tests/steps/client_send_streaming_steps.rs

Length of output: 548


🏁 Script executed:

#!/bin/bash
# Check docs/execplans/8-4-1-send-streaming-helper.md for ExecPlan and BDD definitions
echo "=== Checking first use of ExecPlan in docs/execplans/8-4-1-send-streaming-helper.md ==="
cat docs/execplans/8-4-1-send-streaming-helper.md | head -n 10

echo -e "\n=== Checking first use of BDD in docs/execplans/8-4-1-send-streaming-helper.md ==="
rg -n "BDD" docs/execplans/8-4-1-send-streaming-helper.md | head -n 3

Length of output: 789


🏁 Script executed:

#!/bin/bash
# Check ADR 0002 and users guide for timeout semantics consistency
echo "=== Checking timeout semantics in ADR 0002 ==="
rg -n -C3 "timeout.*partial.*frame|partial.*frame.*timeout" docs/adr-002-streaming-requests-and-shared-message-assembly.md

echo -e "\n=== Checking timeout semantics in users guide ==="
rg -n -C3 "timeout" docs/users-guide.md | head -n 20

Length of output: 1464


🏁 Script executed:

#!/bin/bash
# Check more of send_streaming_inner to see the full frame assembly logic
echo "=== Checking frame assembly in send_streaming_inner ==="
rg -n -A10 "let chunk = buf.get" src/client/send_streaming.rs

Length of output: 586


🏁 Script executed:

#!/bin/bash
# Check the full when_send_streaming function body
echo "=== Checking full when_send_streaming function ==="
cat tests/steps/client_send_streaming_steps.rs | sed -n '67,77p'

Length of output: 407


🏁 Script executed:

#!/bin/bash
# Check ADR 0002 for timeout semantics - search more broadly
echo "=== Checking ADR 0002 for timeout-related text ==="
rg -n -C5 "timeout" docs/adr-002-streaming-requests-and-shared-message-assembly.md | head -n 40

Length of output: 1860


🏁 Script executed:

#!/bin/bash
# Check if there's a read_chunk helper function defined
echo "=== Checking for ReadChunk enum and read_chunk function ==="
rg -n -A10 "enum ReadChunk|fn read_chunk" src/client/send_streaming.rs

Length of output: 829


🏁 Script executed:

#!/bin/bash
# Check the do_send_streaming method in the fixture to see if header_size is used
echo "=== Checking do_send_streaming implementation ==="
rg -n -A15 "pub async fn do_send_streaming" tests/fixtures/client_send_streaming.rs

Length of output: 1512


🏁 Script executed:

#!/bin/bash
# Search for MUST not emit partial frame text in ADR
echo "=== Searching for 'MUST NOT' or 'must not' with 'partial' in ADR ==="
rg -n -i -C5 "must not.*partial|partial.*must not" docs/adr-002-streaming-requests-and-shared-message-assembly.md

Length of output: 748


🏁 Script executed:

#!/bin/bash
# Search for the outbound streaming section in ADR 0002
echo "=== Checking outbound streaming section in ADR 0002 ==="
rg -n -C10 "send_streaming|outbound streaming" docs/adr-002-streaming-requests-and-shared-message-assembly.md | head -n 60

Length of output: 3323


🏁 Script executed:

#!/bin/bash
# Check the users guide for partial frame text
echo "=== Checking users guide for partial frame or timeout behavior ==="
rg -n -C5 "partial|timeout.*frame|frame.*timeout" docs/users-guide.md

Length of output: 5578


🏁 Script executed:

#!/bin/bash
# Check ExecPlan for partial frame/timeout text
echo "=== Checking ExecPlan for timeout and partial frame semantics ==="
rg -n -C5 "partial.*frame|timeout.*operation" docs/execplans/8-4-1-send-streaming-helper.md | head -n 40

Length of output: 2151


@leynos: Yes, all of the issues have been resolved! Here's the status:

Overall Comments — ✅ All Resolved

  1. Timeout/partial-frame semantics consistency: The documents are now consistent. The ADR states timeouts "MUST not emit a partial frame" (meaning no incomplete frame), whilst the users guide clarifies "frames already sent remain sent" (meaning complete frames persist). There's no conflict—the timeout stops the operation without emitting a half-written frame, whilst previously completed frames remain sent.

  2. use_blocking_reader flag removal: The field has been removed from ClientSendStreamingWorld—no references found in the fixture file.

  3. Buffer slicing improvement: The code now uses buf.get(..n).ok_or_else(|| ...) (line 246), which explicitly validates the slice bounds and returns an error if violated, then uses the validated chunk. This is actually better than the suggested direct slice, as it adds an explicit invariant check.

Individual Comments — ✅ All Resolved

  1. buf.get(..n).unwrap_or(&buf): Resolved with the improved approach described above (explicit validation via ok_or_else).

  2. use_blocking_reader unused: Resolved—field removed from the fixture.

  3. header_size parameter unused: Now wired through! Line 155 in tests/fixtures/client_send_streaming.rs shows header.resize(header_size, 0x00); and line 73 in the steps file passes it through to do_send_streaming.

  4. 3-branch match extraction: Resolved—lines 269-285 define ReadChunk enum and read_chunk() helper function, which is called at line 234 in the main loop.

  5. ExecPlan acronym undefined: Resolved—line 3 now reads "This execution plan (ExecPlan) is a living document."

  6. BDD acronym undefined: Resolved—line 106 now reads "Behaviour-Driven Development (BDD) step text could conflict with..."

All issues have been addressed. The implementation is in good shape! 🎉

@leynos
Copy link
Copy Markdown
Owner Author

leynos commented Feb 28, 2026

@coderabbitai Please suggest a fix for this issue and supply a prompt for an AI coding agent to enable it to apply the fix:

src/client/tests/send_streaming.rs

Comment on file

async fn rejects_oversized_header() -> TestResult {
    let (server, _frames) = spawn_receiving_server().await?;
    let mut client = create_send_client(server.addr).await?;

    let header = vec![0u8; DEFAULT_MAX_FRAME];
    let config = SendStreamingConfig::default();
    assert_io_error(
        client
            .send_streaming(&header, b"hello" as &[u8], config)
            .await,
        io::ErrorKind::InvalidInput,
    )
}

❌ New issue: Code Duplication
The module contains 2 functions with similar structure: rejects_oversized_header,rejects_zero_chunk_size

@leynos
Copy link
Copy Markdown
Owner Author

leynos commented Feb 28, 2026

@coderabbitai Have the following now been resolved?

Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@docs/users-guide.md`:
- Around line 1406-1410: The numbered footnote [^53] is out of order: update the
inline reference "[^53]" and its corresponding definition ("Implemented in
`src/client/send_streaming.rs`. See [ADR 0002]..., Section 4, for design
rationale.") so that the footnote number matches its first appearance in the
document; renumber the inline marker and the footnote definition to the correct
sequential number (and update any cross-references to that footnote) so numbered
footnotes are ordered by first appearance.

In `@src/client/send_streaming.rs`:
- Around line 193-207: The timeout branch in send_streaming (where
tokio::time::timeout around self.send_streaming_inner returns Err) must shutdown
the framed write side to avoid flushing buffered frames on connection reuse;
modify the Err(_elapsed) handler in send_streaming to call the framed
transport's shutdown (e.g., invoke a shutdown on the Framed write half or
underlying stream) before invoking invoke_error_hook and returning
Err(ClientError), add the necessary import for the shutdown method used, and
update the send_streaming/docs comment to state that a timeout terminates the
connection and it must not be reused.

---

Duplicate comments:
In `@docs/adr-002-streaming-requests-and-shared-message-assembly.md`:
- Around line 388-390: The wording is incorrect: update the doc text around
SendStreamingOutcome to state that frames_sent is reported only on successful
completion (not on failure), and clarify that failures return a ClientError
(i.e., failure-path progress is not visible via SendStreamingOutcome). Edit the
lines mentioning `frames_sent: u64` and the timeout note so they reference
`SendStreamingOutcome`, `frames_sent`, `send_streaming`, and `ClientError`
exactly, replacing any language that implies failure-path visibility with a
clear statement that errors are returned as `ClientError` instead.

In `@docs/execplans/8-4-1-send-streaming-helper.md`:
- Line 119: Update this document to use the repository-standard Oxford
spellings: replace all instances of "finalise" with "finalize", "serialisation"
with "serialization", and "serialised" with "serialized" throughout the file
(including the occurrences flagged in the comment segment and the other noted
locations); ensure consistency across headings, body text, and checklist items
so the document conforms to the en-GB-oxendict suffix -ize guideline.
- Around line 389-392: Update the test instruction for
send_streaming_timeout_returns_timed_out to remove the tokio::time::pause()
suggestion and instead instruct using an AsyncRead that blocks indefinitely
(e.g., create a tokio::io::duplex and never write to the writer side) as the
proven approach; set a short timeout and assert the resulting error is
io::ErrorKind::TimedOut so the test matches the working pattern used elsewhere.

In `@src/client/send_streaming.rs`:
- Around line 186-190: The call to effective_chunk_size in send_streaming.rs
currently uses the ? operator and therefore returns on error without invoking
the observability hook; update the error handling at the
effective_chunk_size(...) call site to match the documented contract by matching
the Result, calling invoke_error_hook(...) with the error (providing the same
context used elsewhere), and then returning the error (or converting it as
before). Locate the call to effective_chunk_size, replace the ? usage with a
match or if let Err(e) branch that calls invoke_error_hook(e.clone() or
appropriate reference) and then returns Err(e) so the failure-path is observed.
- Around line 276-279: The code currently allows config.chunk_size == Some(0)
and returns 0 (so a non-empty body is treated as zero frames); update the logic
around the match that returns Ok(...) (where config.chunk_size and available are
used) to explicitly reject a zero chunk size: if config.chunk_size == Some(0)
return an Err with a clear invalid-argument error (or map it to the function's
existing error type) instead of returning 0, otherwise keep the existing
min(available) or available behavior; reference the same symbols
config.chunk_size and available in your check so the change is localized to the
existing match/return site.

In `@tests/features/client_send_streaming.feature`:
- Line 18: Rename the Gherkin scenario title in
tests/features/client_send_streaming.feature to "Client send operation times out
on a slow body reader" and update the corresponding Rust scenario identifier
string in tests/scenarios/client_send_streaming_scenarios.rs (the line assigning
name = "...") to match that exact text so the feature and Rust scenario name are
synchronized.

In `@tests/fixtures/client_send_streaming.rs`:
- Around line 143-145: In do_send_streaming_with_timeout, read the
use_blocking_reader flag at the start and if it's false return an Err
immediately (fail fast), then set use_blocking_reader = false to consume/reset
the flag before proceeding; update the same pattern for the other timeout send
path variant referenced around lines 167–190 so both timeout send paths check,
fail fast, and clear use_blocking_reader (refer to the method name
do_send_streaming_with_timeout and the struct field use_blocking_reader).

In `@tests/steps/client_send_streaming_steps.rs`:
- Around line 47-55: Remove the clippy suppression and make the step infallible
by editing the #[given("a send-streaming body reader that blocks indefinitely")]
step: delete the #[expect(clippy::unnecessary_wraps,...)] attribute, change the
function signature fn given_blocking_reader(client_send_streaming_world: &mut
ClientSendStreamingWorld) -> TestResult to return () (i.e., fn
given_blocking_reader(client_send_streaming_world: &mut
ClientSendStreamingWorld)), and remove the trailing Ok(()) so the body only
calls client_send_streaming_world.set_blocking_reader();.

@coderabbitai

This comment was marked as resolved.

@coderabbitai

This comment was marked as resolved.

…for determinism

- Refactor `spawn_receiving_server` to return a `ReceivingServer` struct with an async `collect_frames` method, eliminating mutex-based frame sharing and avoiding fixed sleeps in tests.
- Update send streaming tests to await server frame collection instead of sleeping for fixed durations.
- Update tests to use the new API and remove unnecessary sleeps.
- Add a helper `test_body` function in the fixtures for deterministic body data.
- Update docs to recommend deterministic blocking reader usage for timeout tests instead of unreliable tokio time pausing.

These changes improve test reliability and deterministically synchronize client-server interactions in streaming tests.

Co-authored-by: devboxerhub[bot] <devboxerhub[bot]@users.noreply.github.com>
codescene-delta-analysis[bot]

This comment was marked as outdated.

Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 6

🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@docs/execplans/8-4-1-send-streaming-helper.md`:
- Line 34: Update the first occurrence of "Behavioural tests (`rstest-bdd`
v0.5.0)" to expand the BDD acronym immediately after first use; e.g., change the
sentence to include "(Behaviour-Driven Development, BDD)" right after
"Behavioural" or "Behavioural tests" so the line reads like "Behavioural tests
(Behaviour-Driven Development, BDD) (`rstest-bdd` v0.5.0) validate end-to-end
streaming send scenarios." Ensure the `rstest-bdd` reference and version remain
unchanged and keep the phrasing natural.
- Around line 711-717: Update the "Timeout semantics" section to explicitly
state that timeouts are transport-level write failures and the connection must
be terminated and not reused after a timeout: after the sentence that ends with
"partially successful." add a short sentence referencing
`std::io::ErrorKind::TimedOut` (and ADR 0002 if desired) that says the
connection actor SHOULD terminate the connection and callers must not reuse the
connection once a `TimedOut` error is returned; keep the wording concise and
place it before the "# Errors" subsection.
- Around line 50-52: Consolidate the three repetitive MUST bullets about
send_streaming into fewer, varied sentences: update the three lines referencing
send_streaming so they either combine related constraints (e.g.,
"`send_streaming` MUST NOT emit partial frames on timeout or perform automatic
retries.") and a separate line for timeout behavior ("Timeouts MUST return
`std::io::ErrorKind::TimedOut`."), or rephrase them to vary structure (e.g.,
keep "`send_streaming` MUST NOT emit a partial frame on timeout.", change the
second to "Automatic retries are forbidden.", and the third to "Timeouts MUST
surface as `std::io::ErrorKind::TimedOut`."). Ensure the normative MUST/NOT
language and the symbols `send_streaming` and `std::io::ErrorKind::TimedOut` are
preserved.

In `@src/client/tests/send_streaming_infra.rs`:
- Around line 197-202: The test_body function uses
u8::try_from(...).unwrap_or(0) unnecessarily because i.wrapping_rem(256) is
always 0–255; update the closure in test_body to directly cast the iterator
index to u8 (e.g., |i| i as u8) and remove the try_from/unwrap_or machinery so
the code is simpler and idiomatic.

In `@src/client/tests/send_streaming.rs`:
- Around line 318-343: The test invokes_error_hook_on_transport_failure uses a
fragile tokio::time::sleep(Duration::from_millis(50)) to wait for the server
write-side shutdown; replace this nondeterministic wait with a deterministic
synchronization mechanism: change spawn_dropping_server (or
create_send_client_with_error_hook) to return or expose a readiness/closed
signal (e.g., a oneshot/Notify or a future) that is completed when the server
has accepted the connection and shut down the write side, then await that signal
before calling client.send_streaming(b"\x01", &body[..], config).await so the
test deterministically observes the transport failure instead of sleeping.

In `@tests/fixtures/client_send_streaming.rs`:
- Around line 52-55: The closure in test_body is overcomplicating conversion by
using u8::try_from(...).unwrap_or(0); replace the map body inside test_body (the
closure passed to .map(|i| ...)) with a direct cast to u8 (e.g., map(|i| i as
u8) or map(|i| i.wrapping_rem(256) as u8)) since the value always fits in a u8.

ℹ️ Review info

Configuration used: Organization UI

Review profile: ASSERTIVE

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 2a325d2 and a60de6f.

📒 Files selected for processing (4)
  • docs/execplans/8-4-1-send-streaming-helper.md
  • src/client/tests/send_streaming.rs
  • src/client/tests/send_streaming_infra.rs
  • tests/fixtures/client_send_streaming.rs

Comment thread docs/execplans/8-4-1-send-streaming-helper.md Outdated
Comment thread docs/execplans/8-4-1-send-streaming-helper.md Outdated
Comment thread docs/execplans/8-4-1-send-streaming-helper.md Outdated
Comment thread src/client/tests/send_streaming_infra.rs Outdated
Comment thread src/client/tests/send_streaming.rs
Comment thread tests/fixtures/client_send_streaming.rs Outdated
- Refactor send_streaming tests by adding a helper function to assert expected I/O errors, reducing duplication.
- Change spawn_dropping_server to return a notification handle allowing deterministic synchronization instead of sleeps.
- Update tests to await this notification to improve reliability.
- Enhance code comments and documentation for timeout and error behavior, emphasizing that TimedOut errors indicate transport failure and require connection termination.
- Minor improvements to BDD test documentation wording and formatting.

These changes clarify the error handling contract and improve the stability and maintainability of send_streaming tests.

Co-authored-by: devboxerhub[bot] <devboxerhub[bot]@users.noreply.github.com>
@leynos
Copy link
Copy Markdown
Owner Author

leynos commented Feb 28, 2026

@coderabbitai Have the following now been resolved?

❌ Remaining Issue (1/11)

send_streaming doc comment missing connection termination warning — The doc comment (lines 141-147) mentions timeout semantics and partial success, but does NOT explicitly state that the connection must not be reused after a timeout. The issue requested: "update the send_streaming/docs comment to state that a timeout terminates the connection and it must not be reused."

The current doc says:

When a timeout is configured, it wraps the entire operation. If the timeout elapses, no further frames are emitted and std::io::ErrorKind::TimedOut is returned. Any frames already sent remain sent — callers must assume the operation may have been partially successful.

It should add a sentence warning that the connection is in an indeterminate state and must not be reused.

@coderabbitai
Copy link
Copy Markdown
Contributor

coderabbitai Bot commented Feb 28, 2026

Tip

For best results, initiate chat on the files or code changes.

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# Check the current doc comment for send_streaming method
echo "=== Checking send_streaming doc comment in src/client/send_streaming.rs ==="
sed -n '130,210p' src/client/send_streaming.rs

Length of output: 3229


@leynos: Yes, this issue has been resolved! ✅

The doc comment (lines 146-148) now explicitly states:

A TimedOut error is a transport-level write failure; the connection SHOULD be terminated and MUST NOT be reused (see ADR 0002, §4).

This directly addresses the requirement. The warning uses RFC 2119 keywords (SHOULD, MUST NOT) to make the semantics unambiguous and references the ADR for design rationale. The connection termination requirement is now clearly documented.

All 11 issues are now resolved! 🎉

…y expects

Replaced usages of `u8::try_from(i.wrapping_rem(256)).unwrap_or(0)` with `(i % 256) as u8` in test helpers to generate deterministic test byte patterns. Added `#[expect]` attributes to suppress Clippy lints for integer division remainder and possible truncation, with explanations. This change improves code clarity and explicitly documents intentional use of modulo operations in tests.

Co-authored-by: devboxerhub[bot] <devboxerhub[bot]@users.noreply.github.com>
@leynos
Copy link
Copy Markdown
Owner Author

leynos commented Feb 28, 2026


@leynos
Copy link
Copy Markdown
Owner Author

leynos commented Feb 28, 2026

@coderabbitai review

@coderabbitai
Copy link
Copy Markdown
Contributor

coderabbitai Bot commented Feb 28, 2026

✅ Actions performed

Review triggered.

Note: CodeRabbit is an incremental review system and does not re-review already reviewed commits. This command is applicable only when automatic reviews are paused.

Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@docs/execplans/8-4-1-send-streaming-helper.md`:
- Line 260: The phrase "sub-modules" in the document entry for
`src/client/tests/mod.rs` should be changed to the single-word form
"submodules"; update the text that currently reads "`src/client/tests/mod.rs` —
unit test module with sub-modules." to use "submodules" instead to match
standard compound usage.

ℹ️ Review info

Configuration used: Organization UI

Review profile: ASSERTIVE

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between a60de6f and c0e8934.

📒 Files selected for processing (5)
  • docs/execplans/8-4-1-send-streaming-helper.md
  • src/client/send_streaming.rs
  • src/client/tests/send_streaming.rs
  • src/client/tests/send_streaming_infra.rs
  • tests/fixtures/client_send_streaming.rs

Comment thread docs/execplans/8-4-1-send-streaming-helper.md Outdated
Co-authored-by: devboxerhub[bot] <devboxerhub[bot]@users.noreply.github.com>
Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@docs/execplans/8-4-1-send-streaming-helper.md`:
- Around line 499-509: The documentation's Stage E quality-gate command list is
missing the required "make nixie" invocation; update the command block(s) that
currently run "make markdownlint" (and the other gates) to also run "make nixie"
so the documented validation flow matches repository checks — specifically add
"make nixie" alongside "make markdownlint" in the Stage E blocks referenced
around the existing commands (the lines invoking make markdownlint /
MDLINT=/root/.bun/bin/markdownlint-cli2) so Mermaid diagram validation is
included.
- Around line 485-493: Update the users guide and Stage D docs to explicitly
document the tuple-based multi-packet streaming API: add a subsection titled
"Outbound streaming sends" that explains when to use send_streaming, shows the
SendStreamingConfig builder API, and details timeout semantics and
partial-success assumptions; additionally, describe the helper constructors that
prepare the channel pair for multi-packet streaming (mention their names and the
tuple shape they produce), include a brief example demonstrating typical usage
with send_streaming + SendStreamingConfig and the helper constructors, and
ensure the Stage D checklist requires this tuple-based coverage so published
docs match repository standards.

ℹ️ Review info

Configuration used: Organization UI

Review profile: ASSERTIVE

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between c0e8934 and 3ca05c3.

📒 Files selected for processing (1)
  • docs/execplans/8-4-1-send-streaming-helper.md

Comment thread docs/execplans/8-4-1-send-streaming-helper.md
Comment thread docs/execplans/8-4-1-send-streaming-helper.md
@leynos leynos merged commit 859f2b9 into main Mar 1, 2026
6 checks passed
@leynos leynos deleted the implement-send-streaming-helper-c91skz branch March 1, 2026 10:25
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant