Client streaming: ResponseStream API for multi-packet responses#462
Client streaming: ResponseStream API for multi-packet responses#462
Conversation
…call_streaming - Extend Packet trait with is_stream_terminator method for end-of-stream detection. - Introduce ResponseStream type implementing futures::Stream to consume multi-frame server responses. - Add call_streaming and receive_streaming methods on WireframeClient for high- and low-level streaming APIs. - Implement correlation ID validation and terminator frame handling in streamed responses. - Provide unit and BDD tests covering normal, empty, mismatch, and disconnect scenarios. - Update docs: multi-packet design, user guide with streaming client usage, and roadmap. This completes design goal G5 by enabling clients to asynchronously consume streaming and multi-packet responses with proper correlation and terminator semantics, propagating back-pressure naturally through TCP. No breaking API changes; all additions are backward-compatible. Closes #10.3.1. Co-authored-by: devboxerhub[bot] <devboxerhub[bot]@users.noreply.github.com>
Reviewer's GuideImplements client-side streaming support for multi-packet server responses by introducing a typed ResponseStream that borrows WireframeClient, extends Packet with terminator detection, adds streaming-specific ClientError variants, wires new call_streaming/receive_streaming APIs, and adds comprehensive tests and documentation. Sequence diagram for client call_streaming and ResponseStream consumptionsequenceDiagram
actor ClientApp
participant WireframeClient
participant ResponseStream
participant Server
ClientApp->>WireframeClient: call_streaming(request)
Note over WireframeClient: Ensure request has correlation_id (cid)
WireframeClient->>Server: send request frame (cid)
WireframeClient-->>ClientApp: ResponseStream
loop while stream not terminated
ClientApp->>ResponseStream: next().await
ResponseStream->>WireframeClient: poll_next
WireframeClient->>Server: read next frame
Server-->>WireframeClient: data frame (cid)
WireframeClient-->>ResponseStream: raw bytes
ResponseStream->>ResponseStream: deserialize to Packet
ResponseStream->>ResponseStream: validate correlation_id
alt cid mismatch
ResponseStream-->>ClientApp: Err StreamCorrelationMismatch
note over ClientApp,ResponseStream: break
else cid ok
alt is_stream_terminator
ResponseStream->>ResponseStream: mark terminated
ResponseStream-->>ClientApp: None
note over ClientApp,ResponseStream: break
else data frame
ResponseStream-->>ClientApp: Ok(packet)
end
end
end
Class diagram for Packet, ResponseStream, WireframeClient, and ClientErrorclassDiagram
class Packet {
<<trait>>
+correlation_id() Option~u64~
+set_correlation_id(cid Option~u64~) void
+id() u32
+into_parts() PacketParts
+from_parts(parts PacketParts) Self
+is_stream_terminator() bool
}
class Envelope {
+id: u32
+correlation_id: Option~u64~
+payload: Vec~u8~
+new(id u32, correlation_id Option~u64~, payload Vec~u8~) Envelope
+id() u32
+correlation_id() Option~u64~
+set_correlation_id(cid Option~u64~) void
+into_parts() PacketParts
+from_parts(parts PacketParts) Envelope
}
Packet <|.. Envelope
class ClientError {
<<enum>>
Wireframe
Serialize
PreambleEncode
PreambleWrite
PreambleRead
PreambleTimeout
CorrelationMismatch
StreamCorrelationMismatch
StreamTerminated
+decode(err any) ClientError
+disconnected() ClientError
}
class ResponseStream {
<<generic<'a, P, S, T, C>>>
-client: &'a mut WireframeClient~S, T, C~
-correlation_id: u64
-terminated: bool
-_phantom: PhantomData~fn() -> P~
+new(client &'a mut WireframeClient~S, T, C~, correlation_id u64) ResponseStream~'a, P, S, T, C~
+correlation_id() u64
+is_terminated() bool
-process_frame(bytes &[u8]) Option~Result~P, ClientError~~
+poll_next(cx &mut Context) Poll~Option~Result~P, ClientError~~~
}
class WireframeClient {
<<generic<S, T, C>>>
-framed: Framed~T, LengthDelimitedCodec~
-serializer: S
-codec_config: ClientCodecConfig
-correlation_counter: AtomicU64
+call_streaming~P: Packet~(request P) Result~ResponseStream~P, S, T, C~, ClientError~
+receive_streaming~P: Packet~(correlation_id u64) ResponseStream~P, S, T, C~
+send_envelope(envelope Envelope) Result~u64, ClientError~
+receive_envelope() Result~Envelope, ClientError~
}
Packet <.. ResponseStream : P - Packet
WireframeClient o--> ResponseStream : borrows_mutably
ClientError <.. ResponseStream : uses
WireframeClient --> ClientError : returns
Flow diagram for ResponseStream poll_next and process_frameflowchart TD
A_start[poll_next called] --> B_checkTerminated{terminated?}
B_checkTerminated -- yes --> Z_returnNone[Return None]
B_checkTerminated -- no --> C_pollFramed[Poll client.framed.next]
C_pollFramed --> D_state{Result}
D_state -- Pending --> E_pending[Return Poll Pending]
D_state -- Ready_None --> F_connClosed[Set terminated = true]
F_connClosed --> F1_error[Return Some Err ClientError_disconnected]
D_state -- Ready_Err --> G_transportErr[Set terminated = true]
G_transportErr --> G1_error[Return Some Err ClientError_from_transport]
D_state -- Ready_Ok_bytes --> H_processFrame[process_frame bytes]
H_processFrame --> I_deserialize{deserialize ok?}
I_deserialize -- no --> J_decodeErr[Set terminated = true]
J_decodeErr --> J1_error[Return Some Err ClientError_decode]
I_deserialize -- yes --> K_checkCid{received_cid == Some correlation_id?}
K_checkCid -- no --> L_cidMismatch[Set terminated = true]
L_cidMismatch --> L1_error[Return Some Err StreamCorrelationMismatch]
K_checkCid -- yes --> M_checkTerminator{packet.is_stream_terminator?}
M_checkTerminator -- yes --> N_setTerminated[Set terminated = true]
N_setTerminated --> O_returnNone[Return None]
M_checkTerminator -- no --> P_yield[Return Some Ok packet]
File-Level Changes
Tips and commandsInteracting with Sourcery
Customizing Your ExperienceAccess your dashboard to:
Getting Help
|
WalkthroughImplement client-side streaming response consumption via a new ResponseStream type; add Changes
Sequence DiagramsequenceDiagram
participant Client as Client Code
participant WFC as WireframeClient
participant RS as ResponseStream
participant Transport as Framed\<T>
participant Server as Server
Client->>WFC: call_streaming(request)
WFC->>WFC: compute/assign correlation_id
WFC->>WFC: serialise request
WFC->>Transport: send bytes
Transport->>Server: transmit
WFC-->>Client: ResponseStream{correlation_id}
loop Poll next frame
Client->>RS: poll_next()
RS->>Transport: poll (read frame bytes)
Transport->>Server: await
Server-->>Transport: frame bytes or EOF
Transport-->>RS: Poll::Ready(bytes)
alt Decode success
RS->>RS: validate correlation_id
alt ID matches
RS->>RS: check packet.is_stream_terminator()
alt Terminator detected
RS->>RS: set terminated = true
RS-->>Client: Poll::Ready(None)
else Data frame
RS-->>Client: Poll::Ready(Some(Ok(packet)))
end
else ID mismatch
RS->>RS: set terminated = true
RS-->>Client: Poll::Ready(Some(Err(StreamCorrelationMismatch)))
end
else Decode failure
RS->>RS: set terminated = true
RS-->>Client: Poll::Ready(Some(Err(decode error)))
end
end
Estimated Code Review Effort🎯 4 (Complex) | ⏱️ ~50 minutes Possibly related PRs
Poem
🚥 Pre-merge checks | ✅ 3✅ Passed checks (3 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches
🧪 Generate unit tests (beta)
Comment |
|
@coderabbitai Please suggest a fix for this issue and supply a prompt for an AI coding agent to enable it to apply the fix: Comment on lines +424 to +450 async fn receive_streaming_works_with_pre_sent_request(correlation_id: u64) {
let frames = vec![
TestStreamEnvelope::data(1, correlation_id, vec![55]),
TestStreamEnvelope::terminator(correlation_id),
];
let server = spawn_streaming_server(frames).await;
let mut client = WireframeClient::builder()
.connect(server.addr)
.await
.expect("connect client");
// Send the request manually via send_envelope.
let request = TestStreamEnvelope::data(99, correlation_id, vec![]);
let cid = client.send_envelope(request).await.expect("send");
// Use receive_streaming to consume the response.
let mut stream = client.receive_streaming::<TestStreamEnvelope>(cid);
let frame = stream.next().await.expect("data frame").expect("Ok");
assert_eq!(frame.payload, vec![55]);
let end = stream.next().await;
assert!(end.is_none(), "stream should terminate");
server.handle.abort();
}❌ New issue: Code Duplication |
|
@coderabbitai Please suggest a fix for this issue and supply a prompt for an AI coding agent to enable it to apply the fix: Comment on file //! Unit tests for client streaming response APIs.
❌ New issue: Primitive Obsession |
|
@coderabbitai Please suggest a fix for this issue and supply a prompt for an AI coding agent to enable it to apply the fix: tests/steps/client_streaming_steps.rs Comment on lines +20 to +27 fn given_mismatch_server(client_streaming_world: &mut ClientStreamingWorld) -> TestResult {
client_streaming_world.abort_server();
let rt = tokio::runtime::Runtime::new()?;
rt.block_on(async {
client_streaming_world.start_mismatch_server().await?;
client_streaming_world.connect_client().await
})
}❌ New issue: Code Duplication |
This comment was marked as resolved.
This comment was marked as resolved.
This comment was marked as resolved.
This comment was marked as resolved.
This comment was marked as resolved.
This comment was marked as resolved.
… setup - Extract test server spawning helper functions with detailed control - Introduce TestServer struct managing server task lifecycle - Replace multiple test server functions with unified approach supporting terminator control - Simplify client connection setup in tests via helper functions - Add flowchart diagram documenting ResponseStream poll_next control flow logic - Refactor BDD step definitions to reuse server restart helper - Clean up test code for clarity and reduce boilerplate This refactor improves test reliability and readability, providing clearer server lifecycle handling and more consistent test setup patterns. Co-authored-by: devboxerhub[bot] <devboxerhub[bot]@users.noreply.github.com>
There was a problem hiding this comment.
Hey - I've found 5 issues, and left some high level feedback:
- The
ClientError::StreamTerminatedvariant is documented in the design/user docs as being returned when polling after stream end, but the currentResponseStreamimplementation never constructs it (and instead just returnsNoneonceterminatedis set); consider either wiring this error intopoll_nextor updating/removing the variant and docs to keep behavior consistent. - The BDD step implementations in
tests/steps/client_streaming_steps.rscreate a newtokio::runtime::Runtimeand block on it per step, which is relatively heavy and easy to misuse; consider using async test/step support or a shared runtime instead of per-step runtime construction and blocking.
Prompt for AI Agents
Please address the comments from this code review:
## Overall Comments
- The `ClientError::StreamTerminated` variant is documented in the design/user docs as being returned when polling after stream end, but the current `ResponseStream` implementation never constructs it (and instead just returns `None` once `terminated` is set); consider either wiring this error into `poll_next` or updating/removing the variant and docs to keep behavior consistent.
- The BDD step implementations in `tests/steps/client_streaming_steps.rs` create a new `tokio::runtime::Runtime` and block on it per step, which is relatively heavy and easy to misuse; consider using async test/step support or a shared runtime instead of per-step runtime construction and blocking.
## Individual Comments
### Comment 1
<location> `src/app/envelope.rs:102-111` </location>
<code_context>
+ /// fn set_correlation_id(&mut self, cid: Option<u64>) { self.correlation_id = cid; }
+ /// }
+ ///
+ /// impl Packet for MyFrame {
+ /// fn id(&self) -> u32 { self.id }
+ /// fn into_parts(self) -> PacketParts {
+ /// PacketParts::new(self.id, self.correlation_id, self.payload)
+ /// }
+ /// fn from_parts(parts: PacketParts) -> Self {
+ /// Self {
+ /// id: parts.id(),
+ /// correlation_id: parts.correlation_id(),
+ /// payload: parts.into_payload(),
+ /// }
+ /// }
+ /// fn is_stream_terminator(&self) -> bool { self.id == 0 }
+ /// }
+ ///
</code_context>
<issue_to_address>
**issue (bug_risk):** Doc example for `Packet` is unlikely to compile because `MyFrame` does not implement the `Message` supertrait.
Because `Packet` requires `Message` (and others), this impl will not compile unless `MyFrame` also implements `Message`. As this is a normal doc test, it will fail during docs build. Consider either adding a `Message` impl for `MyFrame` in the example or marking the example with `ignore`/`no_compile` if you don’t want to depend on the real `Message` implementation.
</issue_to_address>
### Comment 2
<location> `src/client/tests/streaming.rs:254-277` </location>
<code_context>
+
+#[rstest]
+#[tokio::test]
+async fn response_stream_validates_correlation_id(correlation_id: u64) {
+ let wrong_cid = correlation_id + 999;
+ let server = spawn_mismatch_server(wrong_cid).await;
+ let mut client = create_test_client(server.addr).await;
+
+ let mut request = TestStreamEnvelope::data(99, correlation_id, vec![]);
+ request.set_correlation_id(Some(correlation_id));
+
+ let mut stream = client
+ .call_streaming::<TestStreamEnvelope>(request)
+ .await
+ .expect("call_streaming");
+
+ let result = stream.next().await;
+ match result {
+ Some(Err(ClientError::StreamCorrelationMismatch { expected, received })) => {
+ assert_eq!(expected, Some(correlation_id));
+ assert_eq!(received, Some(wrong_cid));
+ }
+ other => panic!("expected StreamCorrelationMismatch, got {other:?}"),
+ }
+}
</code_context>
<issue_to_address>
**suggestion (testing):** Add an assertion that the stream is marked terminated and does not yield further items after a StreamCorrelationMismatch error.
This test only checks that the first item is a `StreamCorrelationMismatch`. Please extend it to also assert that, after the mismatch:
- `stream.is_terminated()` is `true`, and
- a subsequent `stream.next().await` returns `None` (or the appropriate terminal result).
This will verify the stream state machine and ensure we don’t keep reading or yielding frames after a correlation mismatch.
```suggestion
#[rstest]
#[tokio::test]
async fn response_stream_validates_correlation_id(correlation_id: u64) {
let wrong_cid = correlation_id + 999;
let server = spawn_mismatch_server(wrong_cid).await;
let mut client = create_test_client(server.addr).await;
let mut request = TestStreamEnvelope::data(99, correlation_id, vec![]);
request.set_correlation_id(Some(correlation_id));
let mut stream = client
.call_streaming::<TestStreamEnvelope>(request)
.await
.expect("call_streaming");
let result = stream.next().await;
match result {
Some(Err(ClientError::StreamCorrelationMismatch { expected, received })) => {
assert_eq!(expected, Some(correlation_id));
assert_eq!(received, Some(wrong_cid));
}
other => panic!("expected StreamCorrelationMismatch, got {other:?}"),
}
// After a correlation mismatch, the stream should be marked terminated
// and yield no further items.
assert!(
stream.is_terminated(),
"stream should be terminated after correlation mismatch"
);
let next = stream.next().await;
assert!(
next.is_none(),
"no further items should be yielded after correlation mismatch"
);
}
```
</issue_to_address>
### Comment 3
<location> `src/client/tests/streaming.rs:78-83` </location>
<code_context>
+ }
+}
+
+/// Serializes a `TestStreamEnvelope` to bytes for transmission.
+fn serialize_envelope(envelope: &TestStreamEnvelope) -> Bytes {
+ Bytes::from(
+ BincodeSerializer
+ .serialize(envelope)
+ .expect("serialize test envelope"),
+ )
+}
</code_context>
<issue_to_address>
**suggestion (testing):** Consider adding a test for how ResponseStream behaves when it receives malformed or undecodable frames.
Existing tests cover happy paths and some error cases, but not deserialization failure inside `process_frame` (where `serializer.deserialize` returns an error).
Please add a test that sends invalid bincode for `TestStreamEnvelope` (e.g., truncated or random bytes) from a test server, then asserts that:
- `stream.next().await` yields `Some(Err(ClientError::Decode(_)))` (or the appropriate variant), and
- the stream terminates afterward.
This will verify that decode failures are handled deterministically and stop further polling of the transport.
Suggested implementation:
```rust
/// Serializes a `TestStreamEnvelope` to bytes for transmission.
fn serialize_envelope(envelope: &TestStreamEnvelope) -> Bytes {
Bytes::from(
BincodeSerializer
.serialize(envelope)
.expect("serialize test envelope"),
)
}
/// Ensures that `ResponseStream` surfaces decode errors and terminates after a
/// malformed frame is received.
#[tokio::test]
async fn response_stream_terminates_after_decode_error() {
use futures::StreamExt;
// Set up a test transport and response stream using the existing test
// harness types used elsewhere in this file.
//
// We assume there is a constructor similar to `TestClientTransport::new`
// that takes a sender/receiver of `Bytes` and that `ResponseStream` is
// constructed in the same way as other tests in this module.
let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<Bytes>();
// Replace `TestClientTransport::new(rx)` and `ResponseStream::new(...)`
// with the concrete helpers/constructors already used in this file.
let transport = TestClientTransport::new(rx);
let mut stream: ResponseStream<TestStreamEnvelope, BincodeSerializer> =
ResponseStream::new(transport, BincodeSerializer);
// Send invalid / undecodable bincode bytes for `TestStreamEnvelope`.
//
// We deliberately send something that is not a valid `TestStreamEnvelope`
// encoding so that `serializer.deserialize` inside `process_frame`
// returns an error.
let invalid_bytes = Bytes::from_static(b"\x01\x02\x03\x04");
tx.send(invalid_bytes).expect("send invalid frame");
// First poll should yield a decode error.
let first = stream.next().await;
match first {
Some(Err(ClientError::Decode(_))) => {}
other => panic!("expected decode error from stream, got: {:?}", other),
}
// After a decode failure, the stream should terminate and not yield any
// further items on subsequent polls.
let second = stream.next().await;
assert!(
second.is_none(),
"stream should terminate after decode error, got: {:?}",
second
);
}
```
The suggested test assumes the presence of several items that you likely already have in this test module:
1. A `TestClientTransport` type (or equivalent) that can be constructed from a `tokio::sync::mpsc::UnboundedReceiver<Bytes>` (or similar). Update `TestClientTransport::new(rx)` to match the actual constructor in your test harness.
2. A `ResponseStream<TestStreamEnvelope, BincodeSerializer>` type and a `ResponseStream::new(transport, BincodeSerializer)` constructor. Adjust the type parameters and constructor call to match your existing tests.
3. A `ClientError::Decode` variant. If the decode error is represented differently (e.g., `ClientError::Deserialization` or wrapped in another error type), update the `match` arm accordingly.
4. Ensure that the necessary imports are present at the top of the file:
- `use bytes::Bytes;`
- `use crate::client::error::ClientError;`
- `use crate::client::streaming::ResponseStream;`
- `use crate::client::tests::util::TestClientTransport;` (or wherever your test transport lives)
- `use crate::client::tests::streaming::TestStreamEnvelope;` (if not already in scope)
- `use crate::serialization::BincodeSerializer;`
- `use futures::StreamExt;`
5. If your existing tests use a different async test attribute (e.g., `#[async_std::test]` or `#[tokio::test(flavor = "current_thread")]`), adjust the attribute on the new test accordingly.
Once these are wired to your actual types and constructors, this test will verify that a decode failure in `process_frame` produces a `ClientError::Decode` and that the `ResponseStream` terminates afterward.
</issue_to_address>
### Comment 4
<location> `docs/execplans/10-3-1-client-response-stream-and-response-multipacket.md:48` </location>
<code_context>
+- All code must pass `make check-fmt`, `make lint`, and `make test`.
+- Documentation must use en-GB-oxendict spelling per `AGENTS.md`.
+- No single source file may exceed 400 lines per `AGENTS.md`.
+- BDD tests must use `rstest-bdd` 0.5.0 (not the removed Cucumber
+ framework).
+
</code_context>
<issue_to_address>
**issue (review_instructions):** The acronym “BDD” is introduced without being expanded on first use; it should be defined (for example, “behaviour-driven development (BDD)”).
To comply with the style guidance, expand the acronym the first time it appears in this document, for example:
"- Behaviour-driven development (BDD) tests must use `rstest-bdd` 0.5.0 (not the removed Cucumber framework)."
<details>
<summary>Review instructions:</summary>
**Path patterns:** `**/*.md`
**Instructions:**
Define uncommon acronyms on first use.
</details>
</issue_to_address>
### Comment 5
<location> `docs/execplans/10-3-1-client-response-stream-and-response-multipacket.md:226` </location>
<code_context>
+
+### Back-pressure
+
+Back-pressure is natural via TCP flow control. If the client reads slowly from
+the socket, the TCP receive buffer fills, the server's TCP send buffer fills,
+the server's `write` suspends, and the server stops polling the response
</code_context>
<issue_to_address>
**issue (review_instructions):** The acronym “TCP” is used here without being expanded on first use in this document; it should be defined when first mentioned.
Consider expanding TCP the first time it appears, for example:
"Back-pressure is natural via Transmission Control Protocol (TCP) flow control. If the client reads slowly from …"
<details>
<summary>Review instructions:</summary>
**Path patterns:** `**/*.md`
**Instructions:**
Define uncommon acronyms on first use.
</details>
</issue_to_address>Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 09215f7f36
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
There was a problem hiding this comment.
Actionable comments posted: 8
🤖 Fix all issues with AI agents
In `@docs/multi-packet-and-streaming-responses-design.md`:
- Around line 641-643: The doc uses the term serializer as an API identifier but
doesn't mark the spelling explicitly; update the sentence "Deserializes it into
`P` using the client's configured serializer." to wrap the API term serializer
in inline code formatting (i.e., mark `serializer`) so the US spelling is
clearly shown as an identifier; ensure the same treatment is applied wherever
the API term serializer appears (e.g., in related sentences or bullets) to
conform to the coding guideline about US spelling for API identifiers.
- Around line 648-697: The HTML comment containing the accessible description
for the Mermaid diagram should be moved out so it is visible to screen readers;
create a short prose paragraph immediately before the ```mermaid``` block that
describes the control flow of ResponseStream::poll_next (mentioning terminated
check, polling client.framed.next, the four poll outcomes: Pending,
Ready(None)=disconnected, Ready(Err)=transport error, Ready(Ok
bytes)=process_frame and its subchecks including deserialize failure,
correlation id mismatch, stream terminator and yielding packet) so the
description is not inside an HTML comment and appears as normal text before the
diagram.
In `@docs/users-guide.md`:
- Around line 1317-1320: Replace the British spelling "recognise" with the
Oxford English spelling "recognize" in the documentation sentence describing
Packet::is_stream_terminator so the line reads that protocols override it to
"recognize their terminator format"; update any adjacent occurrences in the same
paragraph mentioning Packet::is_stream_terminator to maintain consistent
en-GB-oxendict -ize spelling.
In `@src/client/tests/streaming.rs`:
- Around line 1-5: This file exceeds the 400-line guideline because test
infrastructure is embedded in the streaming tests; move helpers like
TestStreamEnvelope and TestServer out of src/client/tests/streaming.rs into a
new sibling module or fixture file (e.g., src/client/tests/streaming_fixtures.rs
or a nested mod streaming::fixtures) and update streaming.rs to import them (use
crate::tests::streaming_fixtures::{TestStreamEnvelope, TestServer} or the
appropriate path); ensure visibility (pub or pub(crate)) and adjust any module
paths in tests so the tests compile and the main streaming.rs falls under 400
lines.
- Around line 78-169: Change the shared test helpers to return Result instead of
panicking: update serialize_envelope, spawn_test_server, spawn_mismatch_server,
and create_test_client to return Result<...> (appropriate concrete Result types
for their return values) and remove all .expect(...) calls, using the ? operator
to propagate errors from TcpListener::bind, listener.local_addr,
listener.accept, transport.send/next, BincodeSerializer.serialize, and
WireframeClient::builder().connect; adjust TestServer construction to return
Ok(TestServer { ... }) and update all test call sites to use ? (or .await?) so
errors bubble up rather than causing panics. Ensure function signatures and
return types (e.g., Result<TestServer, E> and Result<WireframeClient<...>, E>)
are updated consistently.
- Around line 20-34: Replace the primitive type aliases CorrelationId,
MessageId, and Payload with NewType tuple structs (either via newt-hype newtype!
or explicit tuple structs) and update TestStreamEnvelope to use those new types
for fields (id: MessageId, correlation_id: Option<CorrelationId>, payload:
Payload); change TERMINATOR_ID to the corresponding MessageId newtype value and
propagate the new types through all relevant functions and tests, specifically
updating signatures and usages in serialize_envelope, spawn_test_server,
spawn_mismatch_server, and any test fixtures or comparisons to construct and
pattern-match the new tuple structs instead of raw u32/u64/Vec<u8>.
In `@tests/fixtures/client_streaming.rs`:
- Around line 77-87: The function serialize_to_bytes currently panics via
.expect() and uses an allow for clippy::expect_used; remove that suppression and
change the signature of serialize_to_bytes to return Result<Bytes, E> (e.g.,
Result<Bytes, bincode::Error> or impl From<BincodeError>) instead of Bytes, and
replace the .expect("serialize test frame") with returning the serialization
error (i.e., map the BincodeSerializer.serialize(self) result into Bytes on Ok
and propagate Err). Update callers of serialize_to_bytes to propagate or handle
the Result (they already handle Results in tests), and remove the test-only
rationale comment.
- Around line 322-333: Remove the clippy suppression and replace the unchecked
casts in send_data_frames: change the usages of i as u32 and i as u8 when
building StreamTestEnvelope::data to fallible conversions
(u32::try_from(i).expect("count is bounded by test data") and
u8::try_from(i).expect("count is bounded by test data")), and remove the
#[expect(clippy::cast_possible_truncation,...)] attribute; apply the same
replacement in the verify_frame_order helper where i is cast to u32/u8 so all
potential truncations use try_from with the same explanatory expect message.
…e tests - Extract shared test infrastructure into src/client/tests/streaming_infra.rs to reduce duplication and improve code organization. - Introduce newtypes (CorrelationId, MessageId, Payload) for clearer test semantics. - Replace ad-hoc serialization helpers with fallible versions. - Add detailed async test functions with improved error handling and results. - Cover more edge cases: decode errors, correlation ID mismatches, connection closes. - Update existing tests for better clarity and correctness with explicit correlation IDs and consistent payload construction. - Minor documentation and style improvements in test-related code. This improves maintainability and robustness of client streaming functionality tests. Co-authored-by: devboxerhub[bot] <devboxerhub[bot]@users.noreply.github.com>
There was a problem hiding this comment.
Actionable comments posted: 8
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@docs/execplans/10-3-1-client-response-stream-and-response-multipacket.md`:
- Around line 15-22: This section must explicitly state that the multi-packet
API is tuple-based and call out the helper constructors that prepare the
response channel pair: add a short note after the paragraph describing
Response::Stream and Response::MultiPacket explaining that multi-packet
responses are represented as a tuple-based API and mention the helper
constructors Response::with_channel and Response::with_channel_and_initial
(which create the channel pair and optionally include an initial frame). Ensure
the wording is brief, follows the existing style, and references those
constructor names exactly.
- Around line 24-29: Reflow the long paragraph and any other over-80-column
lines in this section so all Markdown paragraphs and bullets are wrapped at 80
columns; specifically rewrap the sentence describing Observable outcome (the
line containing client.call_streaming(request), ResponseStream<P>, Result<P,
ClientError>, and the list of tests like single-frame, multi-frame,
empty-stream, terminator validation, correlation mismatch, and back-pressure) so
no line exceeds 80 characters while preserving the original wording and
punctuation.
- Around line 48-49: The docs still mention the removed Cucumber framework;
update the sentence that currently reads "BDD tests must use `rstest-bdd` 0.5.0
(not the removed Cucumber framework)" to remove any reference to Cucumber and
state rstest-bdd-only wording (e.g., "BDD tests must use `rstest-bdd` 0.5.0").
Ensure other occurrences of "Cucumber" in this document are replaced or retired
so all examples and guides refer only to `rstest-bdd`.
In `@docs/multi-packet-and-streaming-responses-design.md`:
- Around line 648-659: The paragraph describing the decision tree for
ResponseStream::poll_next is too long and must be reflowed to 80-column width;
split the long sentence into multiple wrapped lines so each line is <=80 columns
while preserving the exact technical terms and order (ResponseStream::poll_next,
Framed, process_frame, is_stream_terminator, StreamCorrelationMismatch), keeping
the same logical flow (terminated check, Framed poll outcomes
Pending/Ready(None)/Ready(Err)/Ready(Ok(bytes)), process_frame deserialize and
is_stream_terminator checks, correlation ID validation) and maintaining
punctuation and code span formatting.
- Around line 720-727: The docs list ClientError::StreamTerminated as a
ResponseStream error but ResponseStream::poll_next currently returns None on
termination and never yields ClientError::StreamTerminated; update the docs to
remove or reword that row to state that terminated streams cause poll_next to
return None (no error), or alternatively implement ClientError::StreamTerminated
by changing ResponseStream::poll_next to return
Err(ClientError::StreamTerminated) when polling after termination — touch the
ResponseStream::poll_next implementation and the ClientError enum
(ClientError::StreamTerminated) accordingly depending on which behavior you
choose so code and docs match.
In `@docs/users-guide.md`:
- Around line 1309-1313: Reflow the new client streaming documentation so all
Markdown paragraphs and list items are wrapped at 80 columns: break long lines
in the paragraph that mentions Response::Stream, Response::MultiPacket and the
methods call_streaming/receive_streaming on WireframeClient, and wrap the
sentence describing the returned ResponseStream and its behavior into multiple
lines not exceeding 80 characters per line; preserve punctuation, code spans,
and list structure while ensuring no line exceeds the 80-column limit.
In `@src/app/envelope.rs`:
- Around line 83-115: The doctest for MyFrame uses the wrong bincode derive:
update the derives on the MyFrame struct so it implements Encode and
BorrowDecode (for<'de> BorrowDecode<'de, ()>) instead of Decode; locate the
MyFrame definition in the example and replace the bincode::Decode derive with
bincode::BorrowDecode so MyFrame satisfies the Message trait bounds used by
Packet/PacketParts.
In `@tests/fixtures/client_streaming.rs`:
- Around line 25-83: The StreamTestEnvelope currently uses primitives (u32, u64,
Vec<u8>) causing "integer soup"; replace those with NewType wrappers (e.g.,
MessageId, CorrelationId, Payload from newt-hype or custom tuple structs) and
update all related code: change the struct fields in StreamTestEnvelope, update
TERMINATOR_ID to a MessageId constant, adjust Packet impl methods (id,
into_parts, from_parts, is_stream_terminator) to work with the new types, and
update helper constructors StreamTestEnvelope::data and ::terminator and
serialize_to_bytes to accept/produce the NewTypes; ensure comparisons
(is_stream_terminator) and conversions to/from PacketParts use the appropriate
newtype accessors or conversions.
|
@coderabbitai Please suggest a fix for this issue and supply a prompt for an AI coding agent to enable it to apply the fix: Comment on lines +334 to +358 async fn receive_streaming_works_with_pre_sent_request(
correlation_id: CorrelationId,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let cid = correlation_id;
let frames = vec![
TestStreamEnvelope::data(MessageId::new(1), cid, Payload::new(vec![55])),
TestStreamEnvelope::terminator(cid),
];
let (mut client, _server) = setup_streaming_test(frames).await?;
// Send the request manually via send_envelope.
let request = TestStreamEnvelope::data(MessageId::new(99), cid, Payload::new(vec![]));
let sent_cid = client.send_envelope(request).await.expect("send");
// Use receive_streaming to consume the response.
let mut stream = client.receive_streaming::<TestStreamEnvelope>(sent_cid);
let frame = stream.next().await.expect("data frame").expect("Ok");
assert_eq!(frame.payload, vec![55]);
let end = stream.next().await;
assert!(end.is_none(), "stream should terminate");
Ok(())
}❌ New issue: Code Duplication |
This comment was marked as resolved.
This comment was marked as resolved.
This comment was marked as resolved.
This comment was marked as resolved.
This comment was marked as resolved.
This comment was marked as resolved.
…d-stream termination - Removed the StreamTerminated variant from ClientError. - ResponseStream now returns None after termination or fatal error, following futures::Stream fused-stream idiom. - Updated docs and tests to reflect the removal and use of None to indicate stream end. - Refactored client streaming tests and fixtures to align with the fused-stream pattern. - Simplified error handling and documentation accordingly. Co-authored-by: devboxerhub[bot] <devboxerhub[bot]@users.noreply.github.com>
…or handling - Introduce strongly typed wrappers (CorrelationId, MessageId, Payload) for stream data elements. - Update StreamTestEnvelope and related traits to use these types for better type safety. - Handle tokio runtime creation errors gracefully in ClientStreamingWorld with optional error storage. - Change test steps to propagate errors instead of panicking. - Minor doc and error comment corrections related to streaming responses. - Remove some redundant attributes and improve encoding derives. This refactor enhances type safety for streaming data, improves error handling in tests, and cleans up test fixtures to reduce panic risk and improve clarity. Co-authored-by: devboxerhub[bot] <devboxerhub[bot]@users.noreply.github.com>
…ssure in docs Enhanced the users guide to explicitly explain that ResponseStream holds an exclusive mutable borrow on WireframeClient, preventing other I/O until the stream is dropped or consumed. Also clarified TCP back-pressure behavior in client streaming section. Additional minor improvements: - Reformatted error display annotation for StreamCorrelationMismatch. - Added bincode::BorrowDecode derive macros for test fixture types. - Minor tidying in client_streaming tests, including server abort before start and error verification refinement. Co-authored-by: devboxerhub[bot] <devboxerhub[bot]@users.noreply.github.com>
…ules - Extract server helper functions into tests/fixtures/client_streaming/server.rs - Extract types and StreamTestEnvelope into tests/fixtures/client_streaming/types.rs - Clean up imports and mod declarations in tests/fixtures/client_streaming.rs This refactor improves organization and modularity of client streaming test fixtures. Co-authored-by: devboxerhub[bot] <devboxerhub[bot]@users.noreply.github.com>
…ope fields Add detailed doc comments to the `id`, `correlation_id`, and `payload` fields of the `StreamTestEnvelope` struct to clarify their roles and behavior, improving code readability and maintainability in the client_streaming tests. Co-authored-by: devboxerhub[bot] <devboxerhub[bot]@users.noreply.github.com>
|
Tip For best results, initiate chat on the files or code changes. 🧩 Analysis chain🏁 Script executed: Length of output: 1181 🏁 Script executed: Length of output: 5735 🏁 Script executed: Length of output: 3633
Excellent news! All review comments have now been fully resolved. Here's the complete status: ✅ All Resolved (7 out of 7)Overall Comment 1 (ClientError::StreamTerminated): ✅ RESOLVED Overall Comment 2 (BDD per-step runtime): ✅ RESOLVED Individual Comment 1 (Packet doc example): ✅ RESOLVED Individual Comment 2 (Correlation test assertions): ✅ RESOLVED
Individual Comment 3 (Decode error test): ✅ RESOLVED Individual Comment 4 (BDD acronym): ✅ RESOLVED Individual Comment 5 (TCP acronym): ✅ RESOLVED All outstanding review items have been addressed. The PR is now fully compliant with the review requirements. |
Summary
Changes
API surface
Wiring and module structure
Implementation details
Terminator behavior
Testing
Documentation
Rationale / Design notes
Testing plan
How to test locally
Compatibility and migration
◳ Generated by DevBoxer ◰
📎 Task: https://www.devboxer.com/task/a01b9779-887b-49af-956b-8a345fe84d0e