Conversation
Reviewer's GuideThis PR introduces a new push module that provides prioritized, bounded queues for asynchronous frame delivery, exposes it in the public API, enriches the documentation with design diagrams, and adds comprehensive tests covering queue ordering, overflow policies, and closed-queue handling. Class diagram for the new prioritized push queues moduleclassDiagram
class FrameLike {
}
class PushPriority {
<<enum>>
High
Low
}
class PushPolicy {
<<enum>>
ReturnErrorIfFull
DropIfFull
WarnAndDropIfFull
}
class PushError {
<<enum>>
QueueFull
Closed
}
class PushHandleInner {
high_prio_tx: mpsc::Sender<F>
low_prio_tx: mpsc::Sender<F>
}
class PushHandle {
+push_high_priority(frame: F): Result<(), PushError>
+push_low_priority(frame: F): Result<(), PushError>
+try_push(frame: F, priority: PushPriority, policy: PushPolicy): Result<(), PushError>
}
class PushQueues {
+high_priority_rx: mpsc::Receiver<F>
+low_priority_rx: mpsc::Receiver<F>
+bounded(capacity: usize): (PushQueues, PushHandle)
}
PushHandleInner <.. PushHandle : contains
PushQueues o-- PushHandle : bounded()
PushHandle --> PushPriority
PushHandle --> PushPolicy
PushHandle --> PushError
PushQueues --> PushHandle
PushQueues --> FrameLike
PushHandle --> FrameLike
Flow diagram for prioritized push queue operations and overflow policy handlingflowchart TD
Producer[Producer]
PushHandle[PushHandle]
HighQueue[High Priority Queue]
LowQueue[Low Priority Queue]
Policy[PushPolicy]
Error[PushError or Drop]
Producer -->|push_high_priority / push_low_priority / try_push| PushHandle
PushHandle -->|High| HighQueue
PushHandle -->|Low| LowQueue
PushHandle -->|If queue full| Policy
Policy -->|ReturnErrorIfFull| Error
Policy -->|DropIfFull| Error
Policy -->|WarnAndDropIfFull| Error
File-Level Changes
Tips and commandsInteracting with Sourcery
Customizing Your ExperienceAccess your dashboard to:
Getting Help
|
|
Important Review skippedAuto incremental reviews are disabled on this repository. Please check the settings in the CodeRabbit UI or the You can disable this status message by setting the WalkthroughA new Changes
Sequence Diagram(s)sequenceDiagram
participant Producer as Producer (PushHandle)
participant PushQueues as PushQueues
participant Consumer as Consumer (Receiver)
Producer->>PushQueues: push_high_priority(frame)
alt Queue not full
PushQueues->>Consumer: frame (high priority)
Producer-->>PushQueues: Ok
else Queue full
alt Policy: ReturnErrorIfFull
Producer-->>PushQueues: Err(QueueFull)
else Policy: DropIfFull or WarnAndDropIfFull
PushQueues-->>Producer: Drop frame (optionally log)
end
end
Producer->>PushQueues: push_low_priority(frame)
alt Queue not full
PushQueues->>Consumer: frame (low priority)
Producer-->>PushQueues: Ok
else Queue full
alt Policy: ReturnErrorIfFull
Producer-->>PushQueues: Err(QueueFull)
else Policy: DropIfFull or WarnAndDropIfFull
PushQueues-->>Producer: Drop frame (optionally log)
end
end
Poem
✨ Finishing Touches🧪 Generate Unit Tests
🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
SupportNeed help? Create a ticket on our support page for assistance with any issues or questions. Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Hey @leynos - I've reviewed your changes and they look great!
Prompt for AI Agents
Please address the comments from this code review:
## Individual Comments
### Comment 1
<location> `tests/push.rs:7` </location>
<code_context>
+async fn push_queues_preserve_priority_order() {
+ let (mut queues, handle) = PushQueues::bounded(1);
+
+ handle.push_low_priority(1u8).await.unwrap();
+ handle.push_high_priority(2u8).await.unwrap();
+
+ let high = queues.high_priority_rx.recv().await;
+ let low = queues.low_priority_rx.recv().await;
+
+ assert_eq!(high, Some(2));
+ assert_eq!(low, Some(1));
+}
+
</code_context>
<issue_to_address>
No test for PushError::Closed when receiver is dropped.
Add a test that drops the PushQueues or their receivers, then attempts to push a frame and checks that PushError::Closed is returned. This will improve coverage of error handling for closed queues.
Suggested implementation:
```rust
use wireframe::push::{PushPolicy, PushPriority, PushQueues, PushError};
```
```rust
#[tokio::test]
async fn push_queues_preserve_priority_order() {
let (mut queues, handle) = PushQueues::bounded(1);
}
#[tokio::test]
async fn push_queues_error_on_closed() {
let (mut queues, handle) = PushQueues::bounded(1);
// Drop the high priority receiver
drop(queues.high_priority_rx);
// Try to push to high priority queue, should return PushError::Closed
let res = handle.push_high_priority(42u8).await;
assert!(matches!(res, Err(PushError::Closed)));
// Drop the low priority receiver
drop(queues.low_priority_rx);
// Try to push to low priority queue, should return PushError::Closed
let res = handle.push_low_priority(24u8).await;
assert!(matches!(res, Err(PushError::Closed)));
}
```
</issue_to_address>Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
| handle.push_low_priority(1u8).await.unwrap(); | ||
| handle.push_high_priority(2u8).await.unwrap(); | ||
|
|
||
| let high = queues.high_priority_rx.recv().await; | ||
| let low = queues.low_priority_rx.recv().await; | ||
|
|
||
| assert_eq!(high, Some(2)); | ||
| assert_eq!(low, Some(1)); |
There was a problem hiding this comment.
suggestion (testing): No test for PushError::Closed when receiver is dropped.
Add a test that drops the PushQueues or their receivers, then attempts to push a frame and checks that PushError::Closed is returned. This will improve coverage of error handling for closed queues.
Suggested implementation:
use wireframe::push::{PushPolicy, PushPriority, PushQueues, PushError};#[tokio::test]
async fn push_queues_preserve_priority_order() {
let (mut queues, handle) = PushQueues::bounded(1);
}
#[tokio::test]
async fn push_queues_error_on_closed() {
let (mut queues, handle) = PushQueues::bounded(1);
// Drop the high priority receiver
drop(queues.high_priority_rx);
// Try to push to high priority queue, should return PushError::Closed
let res = handle.push_high_priority(42u8).await;
assert!(matches!(res, Err(PushError::Closed)));
// Drop the low priority receiver
drop(queues.low_priority_rx);
// Try to push to low priority queue, should return PushError::Closed
let res = handle.push_low_priority(24u8).await;
assert!(matches!(res, Err(PushError::Closed)));
}There was a problem hiding this comment.
Actionable comments posted: 2
📜 Review details
Configuration used: CodeRabbit UI
Review profile: ASSERTIVE
Plan: Pro
📒 Files selected for processing (3)
src/lib.rs(1 hunks)src/push.rs(1 hunks)tests/push.rs(1 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (2)
- GitHub Check: build-test
- GitHub Check: Analyze (actions)
🔇 Additional comments (6)
src/lib.rs (1)
15-15: LGTM! Clean module export.The addition of the push module follows the established pattern and correctly exposes the new functionality as part of the public API.
tests/push.rs (1)
17-30: Excellent test coverage for push policy behaviour.This test effectively validates that
try_pushrespects theReturnErrorIfFullpolicy and correctly handles queue capacity limits. The test flow is logical and comprehensive.src/push.rs (4)
17-19: Good use of marker trait pattern.The
FrameLiketrait effectively uses the marker trait pattern to document intent whilst providing type safety. The blanket implementation is idiomatic Rust.
21-46: Well-designed enums with comprehensive coverage.The three enums provide clear, well-documented interfaces:
PushPrioritycovers the essential priority levelsPushPolicyaddresses common queue-full scenarios comprehensivelyPushErrorcaptures the relevant error conditionsAll have appropriate derives and documentation.
48-55: Excellent handle design using Arc for shared ownership.The
PushHandledesign correctly usesArcto enable cloning whilst sharing the underlying channels. This allows multiple producers to push to the same queues efficiently.
122-146: Solid implementation with intentional public field design.The
PushQueuesimplementation is well-structured:
- Good use of
#[must_use]attribute onbounded()- Clean channel creation and tuple return pattern
- Public receiver fields provide direct access, which appears to be an intentional design choice for flexibility
The implementation correctly creates separate channels for each priority level.
|
@sourcery-ai review |
There was a problem hiding this comment.
Hey @leynos - I've reviewed your changes - here's some feedback:
- The log call in
WarnAndDropIfFulluses a literal{priority:?}instead of interpolating the variable; switch tolog::warn!("push queue full; dropping {:?} priority frame", priority)or equivalent. - Consider adding a unified
recv()method onPushQueuesthat automatically yields high‐priority frames before low, so consumers don’t have to manage two receivers manually. - It might be useful to allow separate capacities for the high‐ and low‐priority channels rather than a single bound, giving more flexibility over resource allocation per priority.
Prompt for AI Agents
Please address the comments from this code review:
## Overall Comments
- The log call in `WarnAndDropIfFull` uses a literal `{priority:?}` instead of interpolating the variable; switch to `log::warn!("push queue full; dropping {:?} priority frame", priority)` or equivalent.
- Consider adding a unified `recv()` method on `PushQueues` that automatically yields high‐priority frames before low, so consumers don’t have to manage two receivers manually.
- It might be useful to allow separate capacities for the high‐ and low‐priority channels rather than a single bound, giving more flexibility over resource allocation per priority.
## Individual Comments
### Comment 1
<location> `src/push.rs:131` </location>
<code_context>
+ /// Create a new set of queues with the specified bound and return them along
+ /// with a [`PushHandle`] for producers.
+ #[must_use]
+ pub fn bounded(capacity: usize) -> (Self, PushHandle<F>) {
+ let (high_tx, high_rx) = mpsc::channel(capacity);
+ let (low_tx, low_rx) = mpsc::channel(capacity);
</code_context>
<issue_to_address>
Using the same capacity for both queues may not suit all use cases.
Consider allowing separate capacities for high and low priority queues to increase flexibility.
</issue_to_address>
<suggested_fix>
<<<<<<< SEARCH
/// Create a new set of queues with the specified bound and return them along
/// with a [`PushHandle`] for producers.
#[must_use]
pub fn bounded(capacity: usize) -> (Self, PushHandle<F>) {
let (high_tx, high_rx) = mpsc::channel(capacity);
let (low_tx, low_rx) = mpsc::channel(capacity);
=======
/// Create a new set of queues with the specified bounds for high and low priority,
/// and return them along with a [`PushHandle`] for producers.
///
/// # Parameters
/// - `high_capacity`: The capacity for the high priority queue.
/// - `low_capacity`: The capacity for the low priority queue.
#[must_use]
pub fn bounded(high_capacity: usize, low_capacity: usize) -> (Self, PushHandle<F>) {
let (high_tx, high_rx) = mpsc::channel(high_capacity);
let (low_tx, low_rx) = mpsc::channel(low_capacity);
>>>>>>> REPLACE
</suggested_fix>Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
| /// Create a new set of queues with the specified bound and return them along | ||
| /// with a [`PushHandle`] for producers. | ||
| #[must_use] | ||
| pub fn bounded(capacity: usize) -> (Self, PushHandle<F>) { | ||
| let (high_tx, high_rx) = mpsc::channel(capacity); | ||
| let (low_tx, low_rx) = mpsc::channel(capacity); |
There was a problem hiding this comment.
suggestion: Using the same capacity for both queues may not suit all use cases.
Consider allowing separate capacities for high and low priority queues to increase flexibility.
| /// Create a new set of queues with the specified bound and return them along | |
| /// with a [`PushHandle`] for producers. | |
| #[must_use] | |
| pub fn bounded(capacity: usize) -> (Self, PushHandle<F>) { | |
| let (high_tx, high_rx) = mpsc::channel(capacity); | |
| let (low_tx, low_rx) = mpsc::channel(capacity); | |
| /// Create a new set of queues with the specified bounds for high and low priority, | |
| /// and return them along with a [`PushHandle`] for producers. | |
| /// | |
| /// # Parameters | |
| /// - `high_capacity`: The capacity for the high priority queue. | |
| /// - `low_capacity`: The capacity for the low priority queue. | |
| #[must_use] | |
| pub fn bounded(high_capacity: usize, low_capacity: usize) -> (Self, PushHandle<F>) { | |
| let (high_tx, high_rx) = mpsc::channel(high_capacity); | |
| let (low_tx, low_rx) = mpsc::channel(low_capacity); |
|
@coderabbitai review |
✅ Actions performedReview triggered.
|
|
@sourcery-ai review |
|
@coderabbitai review |
✅ Actions performedReview triggered.
|
There was a problem hiding this comment.
Hey @leynos - I've reviewed your changes - here's some feedback:
- The
log::warn!call intry_pushuses{priority:?}without passing a named argument, which won’t compile—either switch tolog::warn!("... {:?} ...", priority)or supplypriority=priorityexplicitly. - Consider adding a convenience method on
PushQueues(e.g.next()orrecv()) that yields the next frame by priority instead of exposing both receivers and forcing consumers to poll them manually. - It could be useful to support different capacities for high- and low-priority queues in
boundedso that resource allocation can be tuned independently per priority.
Prompt for AI Agents
Please address the comments from this code review:
## Overall Comments
- The `log::warn!` call in `try_push` uses `{priority:?}` without passing a named argument, which won’t compile—either switch to `log::warn!("... {:?} ...", priority)` or supply `priority=priority` explicitly.
- Consider adding a convenience method on `PushQueues` (e.g. `next()` or `recv()`) that yields the next frame by priority instead of exposing both receivers and forcing consumers to poll them manually.
- It could be useful to support different capacities for high- and low-priority queues in `bounded` so that resource allocation can be tuned independently per priority.Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
|
@coderabbitai review |
|
@sourcery-ai review |
✅ Actions performedReview triggered.
|
There was a problem hiding this comment.
Hey @leynos - I've reviewed your changes - here's some feedback:
- Fix the log formatting in the WarnAndDropIfFull branch — using
{priority:?}without providing a named parameter will fail to compile; use positional formatting or supplypriority = priority. - Add tests for the DropIfFull and WarnAndDropIfFull policies to verify that silent drops and warning logs occur as intended.
- Consider adding a higher-level consumer abstraction (e.g. a unified Stream) over the two receivers to simplify pulling frames in priority order rather than exposing raw channels.
Prompt for AI Agents
Please address the comments from this code review:
## Overall Comments
- Fix the log formatting in the WarnAndDropIfFull branch — using `{priority:?}` without providing a named parameter will fail to compile; use positional formatting or supply `priority = priority`.
- Add tests for the DropIfFull and WarnAndDropIfFull policies to verify that silent drops and warning logs occur as intended.
- Consider adding a higher-level consumer abstraction (e.g. a unified Stream) over the two receivers to simplify pulling frames in priority order rather than exposing raw channels.Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
There was a problem hiding this comment.
Actionable comments posted: 2
🔭 Outside diff range comments (1)
docs/asynchronous-outbound-messaging-design.md (1)
120-126: Align closing parenthesis indentation
The closing parenthesis of thetry_pushsignature is not aligned with thepub fndeclaration, which can impact readability. Consider indenting the)to match the start of thepub fnline or moving it onto the same line for consistency.
| ```mermaid | ||
| classDiagram | ||
| class FrameLike { | ||
| } | ||
| class PushPriority { | ||
| <<enum>> | ||
| High | ||
| Low | ||
| } | ||
| class PushPolicy { | ||
| <<enum>> | ||
| ReturnErrorIfFull | ||
| DropIfFull | ||
| WarnAndDropIfFull | ||
| } | ||
| class PushError { | ||
| <<enum>> | ||
| QueueFull | ||
| Closed | ||
| } | ||
| class PushHandleInner { | ||
| high_prio_tx: mpsc::Sender<F> | ||
| low_prio_tx: mpsc::Sender<F> | ||
| } | ||
| class PushHandle { | ||
| +push_high_priority(frame: F): Result<(), PushError> | ||
| +push_low_priority(frame: F): Result<(), PushError> | ||
| +try_push(frame: F, priority: PushPriority, policy: PushPolicy): Result<(), PushError> | ||
| } | ||
| class PushQueues { | ||
| +high_priority_rx: mpsc::Receiver<F> | ||
| +low_priority_rx: mpsc::Receiver<F> | ||
| +bounded(capacity: usize): (PushQueues, PushHandle) | ||
| } | ||
|
|
||
| PushHandleInner <.. PushHandle : contains | ||
| PushQueues o-- PushHandle : bounded() | ||
| PushHandle --> PushPriority | ||
| PushHandle --> PushPolicy | ||
| PushHandle --> PushError | ||
| PushQueues --> PushHandle | ||
| PushQueues --> FrameLike | ||
| PushHandle --> FrameLike | ||
| ``` |
There was a problem hiding this comment.
🧹 Nitpick (assertive)
Enhance class diagram accuracy
The mermaid class diagram captures the main types and relationships well. As a refinement, you might:
- Annotate
PushHandleandPushQueueswith their generic parameter<F>to mirror the Rust signature. - Update the
bounded()relationship label tobounded(capacity: usize)so it matches the actual method signature.
These tweaks will keep the visual model in sync with the code.
🤖 Prompt for AI Agents
In docs/asynchronous-outbound-messaging-design.md between lines 129 and 172,
update the mermaid class diagram to annotate PushHandle and PushQueues with
their generic parameter <F> to reflect the Rust code accurately. Also, modify
the label on the bounded() relationship to bounded(capacity: usize) to match the
actual method signature. These changes will improve the diagram's accuracy and
alignment with the code.
| ```mermaid | ||
| flowchart TD | ||
| Producer[Producer] | ||
| PushHandle[PushHandle] | ||
| HighQueue[High Priority Queue] | ||
| LowQueue[Low Priority Queue] | ||
| Policy[PushPolicy] | ||
| Error[PushError or Drop] | ||
|
|
||
| Producer -->|push_high_priority / push_low_priority / try_push| PushHandle | ||
| PushHandle -->|High| HighQueue | ||
| PushHandle -->|Low| LowQueue | ||
| PushHandle -->|If queue full| Policy | ||
| Policy -->|ReturnErrorIfFull| Error | ||
| Policy -->|DropIfFull| Error | ||
| Policy -->|WarnAndDropIfFull| Error | ||
| ``` |
There was a problem hiding this comment.
🧹 Nitpick (assertive)
Distinguish blocking vs non-blocking pushes
The flowchart currently conflates push_high_priority / push_low_priority (which await on full queues) with try_push (which applies a policy). For clarity, consider splitting into two branches:
- One showing the blocking behaviour of the high/low-priority methods.
- Another illustrating the policy decision tree for
try_push.
This separation will make the push behaviours more explicit.
🤖 Prompt for AI Agents
In docs/asynchronous-outbound-messaging-design.md between lines 174 and 190, the
flowchart mixes blocking push methods (push_high_priority / push_low_priority)
with the non-blocking try_push method that applies a policy. To fix this, split
the flowchart into two branches: one branch showing the blocking behavior of
push_high_priority and push_low_priority leading directly to their respective
queues, and a separate branch illustrating the try_push method leading to the
policy decision tree and possible errors. This will clarify the different push
behaviors.
Summary
Testing
make lintmake testhttps://chatgpt.com/codex/tasks/task_e_6858c9e5e76c8322bf3233f243d846be
Summary by Sourcery
Implement prioritized push queues to enable non-blocking, priority-aware message pushing with backpressure control
New Features:
pushmodule providingPushQueuesandPushHandlefor prioritized asynchronous frame dispatchPushPolicy)Documentation:
Tests:
Summary by CodeRabbit