Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion examples/metadata_routing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ impl FrameMetadata for HeaderSerializer {
struct Ping;

#[derive(bincode::Decode, bincode::Encode)]
#[expect(dead_code, reason = "used only in documentation example")]
#[allow(dead_code, reason = "used only in documentation example")]
struct Pong;

#[tokio::main]
Expand Down
5 changes: 4 additions & 1 deletion examples/ping_pong.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,10 @@ const PING_ID: u32 = 1;
///
/// The middleware chain generates the actual response, so this
/// handler intentionally performs no work.
#[allow(clippy::unused_async)]
#[allow(
clippy::unused_async,
reason = "example handler intentionally performs no work"
)]
Comment on lines +43 to +46
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Replace #[allow] with #[expect] to comply with coding guidelines.

The coding guidelines explicitly forbid #[allow] directives. Use #[expect] instead with the same reason.

Apply this diff to fix the compliance issue:

-#[allow(
-    clippy::unused_async,
-    reason = "example handler intentionally performs no work"
-)]
+#[expect(
+    clippy::unused_async,
+    reason = "example handler intentionally performs no work"
+)]
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
#[allow(
clippy::unused_async,
reason = "example handler intentionally performs no work"
)]
#[expect(
clippy::unused_async,
reason = "example handler intentionally performs no work"
)]
🤖 Prompt for AI Agents
In examples/ping_pong.rs around lines 43 to 46, replace the attribute macro
#[allow(clippy::unused_async, reason = "example handler intentionally performs
no work")] with #[expect(clippy::unused_async, reason = "example handler
intentionally performs no work")] to comply with coding guidelines that forbid
#[allow] directives.

async fn ping_handler() {}

struct PongMiddleware;
Expand Down
135 changes: 93 additions & 42 deletions src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,9 @@ impl Drop for ActiveConnection {

/// Return the current number of active connections.
#[must_use]
pub fn active_connection_count() -> u64 { ACTIVE_CONNECTIONS.load(Ordering::Relaxed) }
pub fn active_connection_count() -> u64 {
ACTIVE_CONNECTIONS.load(Ordering::Relaxed)
}

use crate::{
hooks::{ConnectionContext, ProtocolHooks},
Expand All @@ -65,6 +67,18 @@ enum Event<F, E> {
Idle,
}

/// Context for processing frames during actor execution.
struct ProcessContext<'a, F> {
state: &'a mut ActorState,
out: &'a mut Vec<F>,
}

impl<'a, F> ProcessContext<'a, F> {
fn new(state: &'a mut ActorState, out: &'a mut Vec<F>) -> Self {
Self { state, out }
}
}

/// Configuration controlling fairness when draining push queues.
#[derive(Clone, Copy)]
pub struct FairnessConfig {
Expand Down Expand Up @@ -188,14 +202,20 @@ where
}

/// Replace the fairness configuration.
pub fn set_fairness(&mut self, fairness: FairnessConfig) { self.fairness = fairness; }
pub fn set_fairness(&mut self, fairness: FairnessConfig) {
self.fairness = fairness;
}

/// Set or replace the current streaming response.
pub fn set_response(&mut self, stream: Option<FrameStream<F, E>>) { self.response = stream; }
pub fn set_response(&mut self, stream: Option<FrameStream<F, E>>) {
self.response = stream;
}

/// Get a clone of the shutdown token used by the actor.
#[must_use]
pub fn shutdown_token(&self) -> CancellationToken { self.shutdown.clone() }
pub fn shutdown_token(&self) -> CancellationToken {
self.shutdown.clone()
}

/// Drive the actor until all sources are exhausted or shutdown is triggered.
///
Expand Down Expand Up @@ -283,11 +303,13 @@ where
state: &mut ActorState,
out: &mut Vec<F>,
) -> Result<(), WireframeError<E>> {
match self.next_event(state).await {
Event::Shutdown => self.process_shutdown(state),
Event::High(res) => self.process_high(res, state, out),
Event::Low(res) => self.process_low(res, state, out),
Event::Response(res) => self.process_response(res, state, out)?,
let mut ctx = ProcessContext::new(state, out);

match self.next_event(ctx.state).await {
Event::Shutdown => self.process_shutdown(ctx.state),
Event::High(res) => self.process_high(res, &mut ctx),
Event::Low(res) => self.process_low(res, &mut ctx),
Event::Response(res) => self.process_response(res, ctx.state, ctx.out)?,
Event::Idle => {}
}

Expand All @@ -301,23 +323,40 @@ where
}

/// Handle the result of polling the high-priority queue.
fn process_high(&mut self, res: Option<F>, state: &mut ActorState, out: &mut Vec<F>) {
if let Some(frame) = res {
self.process_frame_common(frame, out);
self.after_high(out, state);
} else {
Self::handle_closed_receiver(&mut self.high_rx, state);
self.reset_high_counter();
}
fn process_high(&mut self, res: Option<F>, ctx: &mut ProcessContext<F>) {
self.process_push(res, ctx, Self::after_high, |this, state| {
Self::handle_closed_receiver(&mut this.high_rx, state);
this.reset_high_counter();
});
}

/// Handle the result of polling the low-priority queue.
fn process_low(&mut self, res: Option<F>, state: &mut ActorState, out: &mut Vec<F>) {
if let Some(frame) = res {
self.process_frame_common(frame, out);
self.after_low();
} else {
Self::handle_closed_receiver(&mut self.low_rx, state);
fn process_low(&mut self, res: Option<F>, ctx: &mut ProcessContext<F>) {
self.process_push(
res,
ctx,
|this, _, _| this.after_low(),
|this, state| Self::handle_closed_receiver(&mut this.low_rx, state),
);
}

/// Handle the result of polling a push queue.
fn process_push<OnSome, OnNone>(
&mut self,
res: Option<F>,
ctx: &mut ProcessContext<F>,
on_some: OnSome,
on_none: OnNone,
) where
OnSome: FnOnce(&mut Self, &mut Vec<F>, &mut ActorState),
OnNone: FnOnce(&mut Self, &mut ActorState),
{
match res {
Some(frame) => {
self.process_frame_common(frame, ctx.out);
on_some(self, ctx.out, ctx.state);
}
None => on_none(self, ctx.state),
}
}

Expand Down Expand Up @@ -374,19 +413,19 @@ where
self.high_start = Some(Instant::now());
}

if self.should_yield_to_low_priority()
&& let Some(rx) = &mut self.low_rx
{
match rx.try_recv() {
Ok(mut frame) => {
self.hooks.before_send(&mut frame, &mut self.ctx);
out.push(frame);
self.after_low();
}
Err(mpsc::error::TryRecvError::Empty) => {}
Err(mpsc::error::TryRecvError::Disconnected) => {
self.low_rx = None;
state.mark_closed();
if self.should_yield_to_low_priority() {
if let Some(rx) = &mut self.low_rx {
match rx.try_recv() {
Ok(mut frame) => {
self.hooks.before_send(&mut frame, &mut self.ctx);
out.push(frame);
self.after_low();
}
Err(mpsc::error::TryRecvError::Empty) => {}
Err(mpsc::error::TryRecvError::Disconnected) => {
self.low_rx = None;
state.mark_closed();
}
}
}
}
Expand All @@ -405,7 +444,9 @@ where
}

/// Reset counters after processing a low-priority frame.
fn after_low(&mut self) { self.reset_high_counter(); }
fn after_low(&mut self) {
self.reset_high_counter();
}

/// Clear the burst counter and associated timestamp.
fn reset_high_counter(&mut self) {
Expand Down Expand Up @@ -448,11 +489,15 @@ where

/// Await cancellation on the provided shutdown token.
#[inline]
async fn wait_shutdown(token: CancellationToken) { token.cancelled_owned().await; }
async fn wait_shutdown(token: CancellationToken) {
token.cancelled_owned().await;
}

/// Receive the next frame from a push queue.
#[inline]
async fn recv_push(rx: &mut mpsc::Receiver<F>) -> Option<F> { rx.recv().await }
async fn recv_push(rx: &mut mpsc::Receiver<F>) -> Option<F> {
rx.recv().await
}

/// Poll `f` if `opt` is `Some`, returning `None` otherwise.
#[expect(
Expand Down Expand Up @@ -535,11 +580,17 @@ impl ActorState {
}

/// Returns `true` while the actor is actively processing sources.
fn is_active(&self) -> bool { matches!(self.run_state, RunState::Active) }
fn is_active(&self) -> bool {
matches!(self.run_state, RunState::Active)
}

/// Returns `true` once shutdown has begun.
fn is_shutting_down(&self) -> bool { matches!(self.run_state, RunState::ShuttingDown) }
fn is_shutting_down(&self) -> bool {
matches!(self.run_state, RunState::ShuttingDown)
}

/// Returns `true` when all sources have finished.
fn is_done(&self) -> bool { matches!(self.run_state, RunState::Finished) }
fn is_done(&self) -> bool {
matches!(self.run_state, RunState::Finished)
}
}
62 changes: 47 additions & 15 deletions src/middleware.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,27 @@ pub struct FrameContainer<F> {
impl<F> FrameContainer<F> {
/// Create a new container holding `frame` bytes.
#[must_use]
pub fn new(frame: F) -> Self { Self { frame } }
pub fn new(frame: F) -> Self {
Self { frame }
}

/// Borrow the inner frame data.
#[must_use]
pub fn frame(&self) -> &F { &self.frame }
pub fn frame(&self) -> &F {
&self.frame
}

/// Mutable access to the frame data.
#[must_use]
pub fn frame_mut(&mut self) -> &mut F { &mut self.frame }
pub fn frame_mut(&mut self) -> &mut F {
&mut self.frame
}

/// Consume the container, returning the frame.
#[must_use]
pub fn into_inner(self) -> F { self.frame }
pub fn into_inner(self) -> F {
self.frame
}
}

/// Incoming request wrapper passed through middleware.
Expand All @@ -49,15 +57,21 @@ impl ServiceRequest {

/// Borrow the underlying frame bytes.
#[must_use]
pub fn frame(&self) -> &[u8] { self.inner.frame().as_slice() }
pub fn frame(&self) -> &[u8] {
self.inner.frame().as_slice()
}

/// Mutable access to the inner frame bytes.
#[must_use]
pub fn frame_mut(&mut self) -> &mut Vec<u8> { self.inner.frame_mut() }
pub fn frame_mut(&mut self) -> &mut Vec<u8> {
self.inner.frame_mut()
}

/// Consume the request, returning the inner frame bytes.
#[must_use]
pub fn into_inner(self) -> Vec<u8> { self.inner.into_inner() }
pub fn into_inner(self) -> Vec<u8> {
self.inner.into_inner()
}
}

/// Response produced by a handler or middleware.
Expand All @@ -77,15 +91,21 @@ impl ServiceResponse {

/// Borrow the inner frame bytes.
#[must_use]
pub fn frame(&self) -> &[u8] { self.inner.frame().as_slice() }
pub fn frame(&self) -> &[u8] {
self.inner.frame().as_slice()
}

/// Mutable access to the response frame bytes.
#[must_use]
pub fn frame_mut(&mut self) -> &mut Vec<u8> { self.inner.frame_mut() }
pub fn frame_mut(&mut self) -> &mut Vec<u8> {
self.inner.frame_mut()
}

/// Consume the response, yielding the raw frame bytes.
#[must_use]
pub fn into_inner(self) -> Vec<u8> { self.inner.into_inner() }
pub fn into_inner(self) -> Vec<u8> {
self.inner.into_inner()
}
}

/// Continuation used by middleware to call the next service in the chain.
Expand Down Expand Up @@ -120,7 +140,9 @@ where
/// ```
#[inline]
#[must_use]
pub fn new(service: &'a S) -> Self { Self { service } }
pub fn new(service: &'a S) -> Self {
Self { service }
}

/// Call the next service with the provided request.
///
Expand Down Expand Up @@ -157,7 +179,11 @@ where

/// Create a new middleware service wrapping `service`.
#[inline]
#[allow(clippy::inline_fn_without_body, unused_attributes)]
#[allow(
clippy::inline_fn_without_body,
unused_attributes,
reason = "future-proof attribute and inline hint without body"
)]
Comment on lines +182 to +186
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Replace #[allow] with #[expect] to comply with coding guidelines.

The coding guidelines explicitly forbid #[allow] directives. Use #[expect] instead with the same reason.

Apply this diff to fix the compliance issue:

-#[allow(
-    clippy::inline_fn_without_body,
-    unused_attributes,
-    reason = "future-proof attribute and inline hint without body"
-)]
+#[expect(
+    clippy::inline_fn_without_body,
+    unused_attributes,
+    reason = "future-proof attribute and inline hint without body"
+)]
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
#[allow(
clippy::inline_fn_without_body,
unused_attributes,
reason = "future-proof attribute and inline hint without body"
)]
#[expect(
clippy::inline_fn_without_body,
unused_attributes,
reason = "future-proof attribute and inline hint without body"
)]
🤖 Prompt for AI Agents
In src/middleware.rs around lines 182 to 186, replace the #[allow] attribute
with #[expect] while keeping the same lint names and reason string to comply
with coding guidelines that forbid #[allow] directives.

#[must_use = "use the returned middleware service"]
async fn transform(&self, service: S) -> Self::Output;
}
Expand All @@ -173,7 +199,9 @@ pub struct FromFn<F> {

impl<F> FromFn<F> {
/// Construct middleware from the provided asynchronous function.
pub fn new(f: F) -> Self { Self { f } }
pub fn new(f: F) -> Self {
Self { f }
}
}

/// Convenience constructor to build middleware from an async function.
Expand Down Expand Up @@ -202,7 +230,9 @@ impl<F> FromFn<F> {
/// # }
/// let mw = from_fn(logging);
/// ```
pub fn from_fn<F>(f: F) -> FromFn<F> { FromFn::new(f) }
pub fn from_fn<F>(f: F) -> FromFn<F> {
FromFn::new(f)
}

/// Service wrapper that applies a middleware function to requests.
///
Expand Down Expand Up @@ -283,7 +313,9 @@ impl<E: Packet> HandlerService<E> {

/// Returns the route identifier associated with this service.
#[must_use]
pub const fn id(&self) -> u32 { self.id }
pub const fn id(&self) -> u32 {
self.id
}
}

struct RouteService<E: Packet> {
Expand Down
Loading