Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 23 additions & 43 deletions src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,7 @@ impl Drop for ActiveConnection {

/// Return the current number of active connections.
#[must_use]
pub fn active_connection_count() -> u64 {
ACTIVE_CONNECTIONS.load(Ordering::Relaxed)
}
pub fn active_connection_count() -> u64 { ACTIVE_CONNECTIONS.load(Ordering::Relaxed) }

use crate::{
hooks::{ConnectionContext, ProtocolHooks},
Expand Down Expand Up @@ -190,20 +188,14 @@ where
}

/// Replace the fairness configuration.
pub fn set_fairness(&mut self, fairness: FairnessConfig) {
self.fairness = fairness;
}
pub fn set_fairness(&mut self, fairness: FairnessConfig) { self.fairness = fairness; }

/// Set or replace the current streaming response.
pub fn set_response(&mut self, stream: Option<FrameStream<F, E>>) {
self.response = stream;
}
pub fn set_response(&mut self, stream: Option<FrameStream<F, E>>) { self.response = stream; }

/// Get a clone of the shutdown token used by the actor.
#[must_use]
pub fn shutdown_token(&self) -> CancellationToken {
self.shutdown.clone()
}
pub fn shutdown_token(&self) -> CancellationToken { self.shutdown.clone() }

/// Drive the actor until all sources are exhausted or shutdown is triggered.
///
Expand Down Expand Up @@ -410,19 +402,19 @@ where
self.high_start = Some(Instant::now());
}

if self.should_yield_to_low_priority() {
if let Some(rx) = &mut self.low_rx {
match rx.try_recv() {
Ok(mut frame) => {
self.hooks.before_send(&mut frame, &mut self.ctx);
out.push(frame);
self.after_low();
}
Err(mpsc::error::TryRecvError::Empty) => {}
Err(mpsc::error::TryRecvError::Disconnected) => {
self.low_rx = None;
state.mark_closed();
}
if self.should_yield_to_low_priority()
&& let Some(rx) = &mut self.low_rx
{
match rx.try_recv() {
Ok(mut frame) => {
self.hooks.before_send(&mut frame, &mut self.ctx);
out.push(frame);
self.after_low();
}
Err(mpsc::error::TryRecvError::Empty) => {}
Err(mpsc::error::TryRecvError::Disconnected) => {
self.low_rx = None;
state.mark_closed();
}
}
}
Expand All @@ -441,9 +433,7 @@ where
}

/// Reset counters after processing a low-priority frame.
fn after_low(&mut self) {
self.reset_high_counter();
}
fn after_low(&mut self) { self.reset_high_counter(); }

/// Clear the burst counter and associated timestamp.
fn reset_high_counter(&mut self) {
Expand Down Expand Up @@ -486,15 +476,11 @@ where

/// Await cancellation on the provided shutdown token.
#[inline]
async fn wait_shutdown(token: CancellationToken) {
token.cancelled_owned().await;
}
async fn wait_shutdown(token: CancellationToken) { token.cancelled_owned().await; }

/// Receive the next frame from a push queue.
#[inline]
async fn recv_push(rx: &mut mpsc::Receiver<F>) -> Option<F> {
rx.recv().await
}
async fn recv_push(rx: &mut mpsc::Receiver<F>) -> Option<F> { rx.recv().await }

/// Poll `f` if `opt` is `Some`, returning `None` otherwise.
#[expect(
Expand Down Expand Up @@ -577,17 +563,11 @@ impl ActorState {
}

/// Returns `true` while the actor is actively processing sources.
fn is_active(&self) -> bool {
matches!(self.run_state, RunState::Active)
}
fn is_active(&self) -> bool { matches!(self.run_state, RunState::Active) }

/// Returns `true` once shutdown has begun.
fn is_shutting_down(&self) -> bool {
matches!(self.run_state, RunState::ShuttingDown)
}
fn is_shutting_down(&self) -> bool { matches!(self.run_state, RunState::ShuttingDown) }

/// Returns `true` when all sources have finished.
fn is_done(&self) -> bool {
matches!(self.run_state, RunState::Finished)
}
fn is_done(&self) -> bool { matches!(self.run_state, RunState::Finished) }
}
16 changes: 6 additions & 10 deletions src/push.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,9 +97,7 @@ pub(crate) struct PushHandleInner<F> {
pub struct PushHandle<F>(Arc<PushHandleInner<F>>);

impl<F: FrameLike> PushHandle<F> {
pub(crate) fn from_arc(arc: Arc<PushHandleInner<F>>) -> Self {
Self(arc)
}
pub(crate) fn from_arc(arc: Arc<PushHandleInner<F>>) -> Self { Self(arc) }

/// Internal helper to push a frame with the requested priority.
///
Expand Down Expand Up @@ -255,9 +253,7 @@ impl<F: FrameLike> PushHandle<F> {
}

/// Downgrade to a `Weak` reference for storage in a registry.
pub(crate) fn downgrade(&self) -> Weak<PushHandleInner<F>> {
Arc::downgrade(&self.0)
}
pub(crate) fn downgrade(&self) -> Weak<PushHandleInner<F>> { Arc::downgrade(&self.0) }
}

/// Receiver ends of the push queues stored by the connection actor.
Expand Down Expand Up @@ -386,10 +382,10 @@ impl<F: FrameLike> PushQueues<F> {
rate: Option<usize>,
dlq: Option<mpsc::Sender<F>>,
) -> Result<(Self, PushHandle<F>), PushConfigError> {
if let Some(r) = rate {
if r == 0 || r > MAX_PUSH_RATE {
return Err(PushConfigError::InvalidRate(r));
}
if let Some(r) = rate
&& (r == 0 || r > MAX_PUSH_RATE)
Comment on lines +385 to +386
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

❌ New issue: Complex Conditional
PushQueues.bounded_with_rate_dlq has 1 complex conditionals with 2 branches, threshold = 2

Suppress

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@coderabbitai Please suggest a fix for this issue and supply a prompt for an AI coding agent to enable it to apply the fix.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@leynos I'll suggest a fix for the complex conditional issue by extracting the rate validation logic into a predicate function, following your coding guidelines.

Suggested Fix

Extract the rate validation logic into a predicate function:

/// Check if a rate value is invalid (zero or exceeds maximum).
fn is_invalid_rate(rate: usize) -> bool {
    rate == 0 || rate > MAX_PUSH_RATE
}

Then replace the complex conditional with:

if let Some(r) = rate
    && is_invalid_rate(r)
{
    return Err(PushConfigError::InvalidRate(r));
}

AI Coding Agent Prompt

In the file src/push.rs, refactor the rate validation logic in the `bounded_with_rate_dlq` method to comply with the complexity guidelines:

1. Add a new private function `is_invalid_rate` that takes a `usize` parameter and returns `bool`
2. The function should return `true` if the rate is 0 or greater than `MAX_PUSH_RATE`
3. Add a doc comment explaining the function's purpose
4. Place the function before the `PushQueues` impl block
5. Replace the complex conditional `r == 0 || r > MAX_PUSH_RATE` with a call to `is_invalid_rate(r)`
6. Keep the existing let-guard pattern structure intact

This refactoring will reduce the cyclomatic complexity while maintaining the same validation behavior.

{
return Err(PushConfigError::InvalidRate(r));
}
let (high_tx, high_rx) = mpsc::channel(high_capacity);
let (low_tx, low_rx) = mpsc::channel(low_capacity);
Expand Down
12 changes: 5 additions & 7 deletions src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -229,9 +229,7 @@ where
/// ```
#[inline]
#[must_use]
pub const fn worker_count(&self) -> usize {
self.workers
}
pub const fn worker_count(&self) -> usize { self.workers }

/// Get the socket address the server is bound to, if available.
#[must_use]
Expand Down Expand Up @@ -509,10 +507,10 @@ async fn process_stream<F, T>(
let peer_addr = stream.peer_addr().ok();
match read_preamble::<_, T>(&mut stream).await {
Ok((preamble, leftover)) => {
if let Some(handler) = on_success.as_ref() {
if let Err(e) = handler(&preamble, &mut stream).await {
tracing::error!(error = ?e, ?peer_addr, "preamble callback error");
}
if let Some(handler) = on_success.as_ref()
&& let Err(e) = handler(&preamble, &mut stream).await
{
tracing::error!(error = ?e, ?peer_addr, "preamble callback error");
}
let stream = RewindStream::new(leftover, stream);
// Hand the connection to the application for processing.
Expand Down