Signal end of stream with terminator frame#314
Conversation
There was a problem hiding this comment.
Sorry @leynos, you have reached your 24-hour rate limit for Sourcery. Please try again later
|
Warning Rate limit exceeded@leynos has exceeded the limit for the number of commits or files that can be reviewed per hour. Please wait 8 minutes and 0 seconds before requesting another review. ⌛ How to resolve this issue?After the wait time has elapsed, a review can be triggered using the We recommend that you space out your commits to avoid hitting the rate limit. 🚦 How do rate limits work?CodeRabbit enforces hourly rate limits for each developer per organization. Our paid plans have higher rate limits than the trial, open-source and free plans. In all cases, we re-allow further reviews after a brief timeout. Please see our FAQ for further information. 📒 Files selected for processing (7)
Summary by CodeRabbit
WalkthroughIntroduce explicit end-of-stream signalling via a new protocol hook and update connection logic to emit an optional terminator frame; expand documentation (hardening, capability maturity, roadmap) and add cucumber and unit tests validating terminator emission and DLQ guidance. Changes
Sequence Diagram(s)sequenceDiagram
participant Client
participant ConnectionActor
participant Stream as ResponseStream
participant Hooks as ProtocolHooks
participant Proto as WireframeProtocol
participant Out as OutboundQueue
Client->>ConnectionActor: Issue command
ConnectionActor->>Stream: Poll next()
alt Frame available
Stream-->>ConnectionActor: Some(frame)
ConnectionActor->>Out: push(frame)
ConnectionActor-->>Client: produced = true (internal)
loop Poll until end
ConnectionActor->>Stream: Poll next()
Stream-->>ConnectionActor: Some(frame) / None
end
end
opt Stream ends
Stream-->>ConnectionActor: None
ConnectionActor->>Hooks: stream_end(ctx)
Hooks->>Proto: stream_end_frame(ctx)
Proto-->>Hooks: Option<terminator>
alt Terminator present
Hooks-->>ConnectionActor: Some(terminator)
ConnectionActor->>Out: push(terminator)
else No terminator
Hooks-->>ConnectionActor: None
end
ConnectionActor->>Client: on_command_end (internal)
end
sequenceDiagram
participant ConnectionActor
participant Hooks
participant Logger as ErrorLogger
ConnectionActor->>ConnectionActor: handle_response()
alt Protocol error
ConnectionActor->>Logger: log error
ConnectionActor->>Hooks: handle_error(...)
ConnectionActor->>ConnectionActor: mark closed
ConnectionActor->>ConnectionActor: on_command_end
end
Estimated code review effort🎯 4 (Complex) | ⏱️ ~40 minutes Possibly related PRs
Poem
✨ Finishing Touches
🧪 Generate unit tests
🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
SupportNeed help? Create a ticket on our support page for assistance with any issues or questions. CodeRabbit Commands (Invoked using PR/Issue comments)Type Other keywords and placeholders
CodeRabbit Configuration File (
|
|
@coderabbitai review |
✅ Actions performedReview triggered.
|
There was a problem hiding this comment.
Actionable comments posted: 7
🔭 Outside diff range comments (1)
src/connection.rs (1)
406-435: Prevent double termination signalling and repeated closure on protocol errorsOn
Some(Err(WireframeError::Protocol(_))), the code marks the source closed and callson_command_end, but leavesself.responseintact. The next poll will likely produceNone, causing:
- A second
state.mark_closed()for the same source- A second
on_command_endinvocation- Potentially skewed closed-source accounting
Clear the response immediately on protocol errors to terminate polling of this source.
Apply:
Some(Err(WireframeError::Protocol(e))) => { warn!(error = ?e, "protocol error"); self.hooks.handle_error(e, &mut self.ctx); state.mark_closed(); + // Stop polling the response after a protocol error to avoid + // double-closing and duplicate `on_command_end` signalling. + self.response = None; self.hooks.on_command_end(&mut self.ctx); crate::metrics::inc_handler_errors(); }
📜 Review details
Configuration used: CodeRabbit UI
Review profile: ASSERTIVE
Plan: Pro
💡 Knowledge Base configuration:
- Jira integration is disabled
You can enable these sources in your CodeRabbit configuration.
📒 Files selected for processing (11)
docs/hardening-wireframe-a-guide-to-production-resilience.md(1 hunks)docs/roadmap.md(1 hunks)docs/the-road-to-wireframe-1-0-feature-set-philosophy-and-capability-maturity.md(1 hunks)src/connection.rs(3 hunks)src/hooks.rs(6 hunks)tests/cucumber.rs(2 hunks)tests/features/stream_end.feature(1 hunks)tests/steps/mod.rs(1 hunks)tests/steps/stream_end_steps.rs(1 hunks)tests/stream_end.rs(1 hunks)tests/world.rs(3 hunks)
🧰 Additional context used
📓 Path-based instructions (2)
**/*.md
⚙️ CodeRabbit Configuration File
**/*.md: * Avoid 2nd person or 1st person pronouns ("I", "you", "we")
- Use en-GB-oxendict (-ize / -our) spelling and grammar
- Headings must not be wrapped.
- Documents must start with a level 1 heading
- Headings must correctly increase or decrease by no more than one level at a time
- Use GitHub-flavoured Markdown style for footnotes and endnotes.
- Numbered footnotes must be numbered by order of appearance in the document.
Files:
docs/roadmap.mddocs/the-road-to-wireframe-1-0-feature-set-philosophy-and-capability-maturity.mddocs/hardening-wireframe-a-guide-to-production-resilience.md
**/*.rs
⚙️ CodeRabbit Configuration File
**/*.rs: * Seek to keep the cyclomatic complexity of functions no more than 12.
Adhere to single responsibility and CQRS
Place function attributes after doc comments.
Do not use
returnin single-line functions.Move conditionals with >2 branches into a predicate function.
Avoid
unsafeunless absolutely necessary.Every module must begin with a
//!doc comment that explains the module's purpose and utility.Comments and docs must follow en-GB-oxendict (-ize / -our) spelling and grammar
Lints must not be silenced except as a last resort.
#[allow]is forbidden.- Only narrowly scoped
#[expect(lint, reason = "...")]is allowed.- No lint groups, no blanket or file-wide suppression.
- Include
FIXME:with link if a fix is expected.Where code is only used by specific features, it must be conditionally compiled or a conditional expectation for unused_code applied.
Use
rstestfixtures for shared setup and to avoid repetition between tests.Replace duplicated tests with
#[rstest(...)]parameterised cases.Prefer
mockallfor mocks/stubs.Prefer
.expect()over.unwrap()Ensure that any API or behavioural changes are reflected in the documentation in
docs/Ensure that any completed roadmap steps are recorded in the appropriate roadmap in
docs/Files must not exceed 400 lines in length
- Large modules must be decomposed
- Long match statements or dispatch tables should be decomposed by domain and collocated with targets
- Large blocks of inline data (e.g., test fixtures, constants or templates) must be moved to external files and inlined at compile-time or loaded at run-time.
Files:
tests/steps/stream_end_steps.rstests/stream_end.rstests/cucumber.rstests/steps/mod.rstests/world.rssrc/connection.rssrc/hooks.rs
🧬 Code Graph Analysis (4)
tests/stream_end.rs (5)
src/hooks.rs (1)
stream_end_frame(61-61)tests/world.rs (1)
stream_end_frame(173-175)tests/connection_actor.rs (1)
queues(25-25)src/push.rs (1)
bounded(297-300)src/connection.rs (1)
with_hooks(155-185)
tests/world.rs (3)
src/hooks.rs (2)
stream_end_frame(61-61)from_protocol(140-176)src/push.rs (1)
bounded(297-300)src/connection.rs (4)
new(30-34)new(138-151)new(514-523)with_hooks(155-185)
src/connection.rs (1)
src/metrics.rs (2)
inc_frames(70-72)inc_frames(75-75)
src/hooks.rs (4)
tests/stream_end.rs (1)
stream_end_frame(20-20)tests/world.rs (1)
stream_end_frame(173-175)src/app.rs (4)
protocol(546-550)new(218-224)new(242-248)new(380-380)src/connection.rs (3)
new(30-34)new(138-151)new(514-523)
🪛 GitHub Check: build-test
tests/world.rs
[warning] 197-197:
Diff in /home/runner/work/wireframe/wireframe/tests/world.rs
[warning] 170-170:
Diff in /home/runner/work/wireframe/wireframe/tests/world.rs
[warning] 197-197:
Diff in /home/runner/work/wireframe/wireframe/tests/world.rs
[warning] 170-170:
Diff in /home/runner/work/wireframe/wireframe/tests/world.rs
🪛 GitHub Actions: CI
tests/world.rs
[warning] 170-170: cargo fmt --check detected a formatting difference in function 'stream_end_frame' at line 170.
[warning] 197-197: cargo fmt --check detected a formatting difference in function 'verify' at line 197.
🪛 LanguageTool
docs/hardening-wireframe-a-guide-to-production-resilience.md
[style] ~327-~327: Would you like to use the Oxford spelling “recognize”? The spelling ‘recognise’ is also correct.
Context: ...load. This explicit marker lets clients recognise that the logical stream has ended and h...
(OXFORD_SPELLING_Z_NOT_S)
🔍 Remote MCP (Deepwiki)
Here is the additional context uncovered about protocol hooks and end‐of‐stream signaling in the existing codebase:
-
ProtocolHooks definition (src/hooks.rs)
- Struct ProtocolHooks<F, E> encapsulates four optional callbacks:
• on_connection_setup: Option<OnConnectionSetupHook>
• before_send: Option<BeforeSendHook>
• on_command_end: Option
• handle_error: Option<HandleErrorHook>
(All are Option-wrapped closures invoked by ConnectionActor.)
- Struct ProtocolHooks<F, E> encapsulates four optional callbacks:
-
ConnectionActor’s use of on_command_end (src/connection.rs)
- In handle_response, ConnectionActor invokes
•self.hooks.on_command_end(&mut self.ctx)when the response stream returns None (normal end‐of‐stream)
• After a WireframeError::Protocol, following handle_error, to signal command/stream termination
- In handle_response, ConnectionActor invokes
-
No existing stream_end_frame hook
- The WireframeProtocol trait in src/hooks.rs defines on_command_end but has no stream_end_frame method.
- Searching for “stream_end” in src/hooks.rs returns no matches.
These gaps explain why the PR adds a new stream_end_frame hook and wires it into ProtocolHooks.
This confirms that the PR’s addition of stream_end_frame (and corresponding ProtocolHooks.stream_end field) fills missing support for explicit terminator-frame callbacks.
🔇 Additional comments (17)
tests/steps/mod.rs (1)
8-8: Wire new stream_end_steps into the step aggregator — LGTM.Module wiring is correct and keeps step definitions cohesive.
docs/roadmap.md (1)
192-194: Mark end-of-stream mechanism as complete — LGTM.The checkbox aligns with the new terminator hook and tests.
tests/steps/stream_end_steps.rs (1)
1-10: Step definitions are concise and idiomatic — LGTM.The steps map cleanly to the feature and adhere to en-GB spelling in docs.
tests/features/stream_end.feature (1)
1-4: Feature captures the core behaviour — LGTM.The scenario drives the end-of-stream path clearly.
tests/cucumber.rs (2)
3-7: Document the new StreamEndWorld suite — looks goodThe runner docs clearly list the three suites and map the new feature file.
Also applies to: 14-15
24-30: Wire up StreamEndWorld execution — looks goodThe new suite is imported and executed after the existing ones, preserving the intended order.
tests/stream_end.rs (1)
23-40: Validate terminator emission end-to-end — test is precise and robustThe test constructs a minimal protocol, runs the actor, and asserts the exact frame sequence [1, 2, 0]. This provides solid coverage of the new stream_end hook path.
tests/world.rs (3)
6-6: Import Arc for hook construction — looks goodArc is required for ProtocolHooks::from_protocol; import is correct and scoped.
178-195: StreamEndWorld processing logic — looks goodThe world wires hooks via ProtocolHooks::from_protocol and accurately captures frames. This mirrors the unit test and strengthens integration coverage.
169-176: Fix rustfmt failures in stream_end_framecargo fmt --check flagged formatting. Expand the one-liner to the standard block form to satisfy CI.
Apply:
- fn stream_end_frame(&self, _ctx: &mut ConnectionContext) -> Option<Self::Frame> { - Some(0) - } + fn stream_end_frame(&self, _ctx: &mut ConnectionContext) -> Option<Self::Frame> { + Some(0) + }(Keep the multi-line style; rustfmt will normalise spacing.)
Likely an incorrect or invalid review comment.
src/connection.rs (2)
346-354: Yield after response production — looks goodGate after_low() on whether a frame was produced. This prevents spurious fairness resets when the stream yields errors or completes without emitting a frame.
399-405: Document the new return semantics of handle_response — looks goodThe doc comment and signature now convey whether a frame was pushed. This simplifies the caller’s control flow.
src/hooks.rs (5)
77-79: Hook type alias is correct and future-proofUse of
FnMutallows multiple end-of-stream events across a connection and matches the pattern used by other hooks.
90-91: Expose stream_end hook on ProtocolHooksAdd the hook field to the public API to enable protocol-provided terminator frames. The placement and visibility look good.
101-101: Default initialises stream_end to NoneDefault state stays inert until wired via
from_protocol, which is consistent with other hooks.
164-168: Wire protocol method into hook correctlyCapture the protocol object and forward to
stream_end_frame, matching the existing wiring pattern. Closure type and lifetime bounds are correct.
174-175: Include stream_end in constructed hooksPopulate
stream_endwhen building from a protocol, mirroring other hooks. This ensures the hook is available by default.
There was a problem hiding this comment.
Actionable comments posted: 1
♻️ Duplicate comments (2)
tests/world.rs (2)
21-24: Import the shared Terminator from tests/commonAfter extracting the protocol into tests/common, import it here.
use common::unused_listener; +use common::terminator::Terminator;
167-174: Deduplicate the Terminator WireframeProtocol into tests/common and reuseExtract the duplicated Terminator implementation to tests/common and import it here to reduce maintenance and drift between tests/world.rs and tests/stream_end.rs.
Remove the inline definition:
-struct Terminator; - -impl WireframeProtocol for Terminator { - type Frame = u8; - type ProtocolError = (); - - fn stream_end_frame(&self, _ctx: &mut ConnectionContext) -> Option<Self::Frame> { Some(0) } -}Then, introduce a shared module:
- Create tests/common/terminator.rs:
// tests/common/terminator.rs use wireframe::hooks::{ConnectionContext, WireframeProtocol}; #[derive(Debug, Default, Clone, Copy)] pub struct Terminator; impl WireframeProtocol for Terminator { type Frame = u8; type ProtocolError = (); fn stream_end_frame(&self, _ctx: &mut ConnectionContext) -> Option<Self::Frame> { Some(0) } }
- Update tests/common/mod.rs to expose it:
// tests/common/mod.rs pub mod terminator; pub use terminator::Terminator; // existing exports...I can open a follow-up change to perform this extraction end-to-end if helpful.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: ASSERTIVE
Plan: Pro
💡 Knowledge Base configuration:
- Jira integration is disabled
You can enable these sources in your CodeRabbit configuration.
📒 Files selected for processing (1)
tests/world.rs(3 hunks)
🧰 Additional context used
📓 Path-based instructions (3)
**/*.rs
📄 CodeRabbit Inference Engine (AGENTS.md)
**/*.rs: Use precise names; boolean names should start with is/has/should
Use en-GB-oxendict spelling and grammar in comments
Function documentation must include clear examples; test documentation should omit redundant examples
Keep code files ≤ 400 lines; split long switch/dispatch logic by feature; move large test data to external files
Disallow Clippy warnings
Fix warnings emitted during tests in code rather than silencing them
Extract helper functions for long functions; adhere to separation of concerns and CQRS
Group related parameters into meaningful structs when functions have many parameters
Consider using Arc for large error returns to reduce data size
Each Rust module must begin with a module-level //! comment describing purpose and utility
Document public APIs with Rustdoc /// comments to enable cargo doc generation
Prefer immutable data; avoid unnecessary mut
Handle errors with Result instead of panicking where feasible
Avoid unsafe code unless necessary and document any usage clearly
Place function attributes after doc comments
Do not use return in single-line functions
Use predicate functions for conditional criteria with more than two branches
Do not silence lints except as a last resort
Lint suppressions must be tightly scoped and include a clear reason
Prefer #[expect(..)] over #[allow(..)] for lints
Prefer .expect() over .unwrap()
Use concat!() to combine long string literals rather than escaping newlines
Prefer single-line function bodies where appropriate (e.g., pub fn new(id: u64) -> Self { Self(id) })
Prefer semantic error enums deriving std::error::Error via thiserror for inspectable conditions
Files:
tests/world.rs
⚙️ CodeRabbit Configuration File
**/*.rs: * Seek to keep the cyclomatic complexity of functions no more than 12.
Adhere to single responsibility and CQRS
Place function attributes after doc comments.
Do not use
returnin single-line functions.Move conditionals with >2 branches into a predicate function.
Avoid
unsafeunless absolutely necessary.Every module must begin with a
//!doc comment that explains the module's purpose and utility.Comments and docs must follow en-GB-oxendict (-ize / -our) spelling and grammar
Lints must not be silenced except as a last resort.
#[allow]is forbidden.- Only narrowly scoped
#[expect(lint, reason = "...")]is allowed.- No lint groups, no blanket or file-wide suppression.
- Include
FIXME:with link if a fix is expected.Where code is only used by specific features, it must be conditionally compiled or a conditional expectation for unused_code applied.
Use
rstestfixtures for shared setup and to avoid repetition between tests.Replace duplicated tests with
#[rstest(...)]parameterised cases.Prefer
mockallfor mocks/stubs.Prefer
.expect()over.unwrap()Ensure that any API or behavioural changes are reflected in the documentation in
docs/Ensure that any completed roadmap steps are recorded in the appropriate roadmap in
docs/Files must not exceed 400 lines in length
- Large modules must be decomposed
- Long match statements or dispatch tables should be decomposed by domain and collocated with targets
- Large blocks of inline data (e.g., test fixtures, constants or templates) must be moved to external files and inlined at compile-time or loaded at run-time.
Files:
tests/world.rs
{src,tests}/**/*.rs
📄 CodeRabbit Inference Engine (AGENTS.md)
Write unit and behavioural tests for new functionality
Files:
tests/world.rs
tests/**/*.rs
📄 CodeRabbit Inference Engine (AGENTS.md)
tests/**/*.rs: Use rstest fixtures for shared setup
Replace duplicated tests with #[rstest(...)] parameterised cases
Prefer mockall for mocks/stubs
Files:
tests/world.rs
🧬 Code Graph Analysis (1)
tests/world.rs (4)
src/hooks.rs (2)
stream_end_frame(61-61)from_protocol(140-176)tests/connection_actor.rs (1)
queues(25-25)src/push.rs (1)
bounded(297-300)src/connection.rs (4)
new(30-34)new(138-151)new(514-523)with_hooks(155-185)
🔍 Remote MCP (Deepwiki)
Summary of additional repository context relevant to this PR (findings pulled from project wiki/docs):
-
New hook fits existing ProtocolHooks/hook wiring pattern: ProtocolHooks already stores optional boxed lifecycle hooks (on_connection_setup, before_send, on_command_end, handle_error); the PR’s stream_end_frame + stream_end hook is consistent with that design and is the natural place to add an optional terminator-frame callback.,
-
ConnectionActor / connection.rs behavior and semantics:
-
ConnectionActor uses a biased tokio::select! loop prioritizing shutdown, high-priority pushes, low-priority pushes, then response streams; callers expect handle_response to indicate if an outbound frame was produced so after_low() yields correctly. Emitting a terminator frame from handle_response (setting produced=true) aligns with existing control flow (affects after_low/yield behavior and metrics).
-
Protocol errors are handled via hooks.handle_error and do not always terminate the connection; connection.rs already logs, calls handle_error, updates metrics, and calls on_command_end when needed — the PR’s added protocol-error handling should follow those established patterns.
-
-
Response streaming model:
- Handlers can return Response::Stream and ConnectionActor naturally polls the stream; when stream yields None that is the canonical end-of-stream event and on_command_end is invoked. Adding stream_end_frame to produce an explicit terminator frame on None is consistent with the streaming lifecycle and backpressure model.
-
Push queues / DLQ behavior:
- PushHandle/PushQueues support bounded queues, rate limiting, and optional DLQ routing for TrySend failures; the documented DLQ fallback behavior in docs aligns with the PR’s routing of dropped frames to a configured DLQ.
-
Testing infra:
- Project uses cucumber BDD + many unit/integration tests; adding a StreamEndWorld, feature file, and tests (stream_end.) follows the existing Cucumber world/step patterns (tests/world.rs, tests/steps/.rs, tests/cucumber.rs). The wireframe_testing helpers and in-memory test patterns are available and the new tests follow them.
-
Docs/roadmap alignment:
- Roadmap already had a Phase item to "signal the end of a multi-packet stream (frame with flag/no payload)"; the PR’s docs changes marking that item complete and adding stream_end_frame are consistent with the roadmap entries and the 1.0 streaming/terminator design notes.
Files / behaviors to focus on in code review (based on repo docs):
- src/hooks.rs: trait additions, ProtocolHooks struct default/init, and from_protocol wiring for stream_end hook.
- src/connection.rs: handle_response/process_response changes to return/consume produced boolean and to emit terminator frames and handle protocol errors.
- tests/*: new cucumber feature (tests/features/stream_end.feature), test world (StreamEndWorld), unit/integration tests (tests/stream_end.rs, tests/steps/stream_end_steps.rs) — ensure tests follow project patterns and are deterministic with backpressure/time control if needed.
- Documentation updates: docs/hardening-wireframe-a-guide-to-production-resilience.md, docs/roadmap.md, and the long-form design doc — ensure wording matches implementation (i.e., stream_end_frame optionality and default None).
Tools/sources used:
- Repository wiki/docs content (pages and structures) retrieved via repository documentation reader.,
⏰ Context from checks skipped due to timeout of 120000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
- GitHub Check: build-test
🔇 Additional comments (4)
tests/world.rs (4)
6-6: LGTM: Import Arc is required for ProtocolHooks::from_protocolArc is used when constructing hooks at Line 189; this import is correct.
15-15: LGTM: Wire up ProtocolHooks and WireframeProtocol importsThese imports are necessary for the new terminator hook implementation and hook wiring.
176-193: LGTM: Correctly wire hooks and actor to emit the terminatorThe stream yields 1, 2; with hooks from Terminator, ConnectionActor should append 0 on end-of-stream. Queue sizing and shutdown handling are appropriate for a deterministic test.
198-201: LGTM: Verify terminator appended; formatting compliesThis resolves the earlier rustfmt warning about single-line functions. The equality assertion precisely validates the intended behaviour.
Summary
stream_end_framehook for protocol-defined terminatorsTesting
make fmtmake lintmake testmake markdownlintmake nixie(fails: Cannot find module 'puppeteer-core')https://chatgpt.com/codex/tasks/task_e_689e39e7aa708322aa4adefd6a8867d5