From 8d5bed5eec8fcc61b54a8650244b025a5159c2fb Mon Sep 17 00:00:00 2001 From: Leynos Date: Fri, 1 Aug 2025 12:14:46 +0100 Subject: [PATCH 1/3] Add shared test helpers --- tests/common/mod.rs | 29 +++++++++++++++++++++++++++++ tests/preamble.rs | 13 ++++--------- tests/server.rs | 25 ++++++++++--------------- tests/world.rs | 6 +++++- 4 files changed, 48 insertions(+), 25 deletions(-) create mode 100644 tests/common/mod.rs diff --git a/tests/common/mod.rs b/tests/common/mod.rs new file mode 100644 index 00000000..d58ec853 --- /dev/null +++ b/tests/common/mod.rs @@ -0,0 +1,29 @@ +//! Shared utilities for integration tests. +//! +//! Provides fixtures for a basic [`WireframeApp`] factory and an unused +//! local port. These helpers reduce duplication across test modules. + +use std::net::{Ipv4Addr, SocketAddr, TcpListener}; + +use rstest::fixture; +use wireframe::app::WireframeApp; + +#[fixture] +#[allow( + unused_braces, + reason = "rustc false positive for single line rstest fixtures" +)] +pub fn factory() -> impl Fn() -> WireframeApp + Send + Sync + Clone + 'static { + || WireframeApp::new().expect("WireframeApp::new failed") +} + +#[fixture] +#[allow( + unused_braces, + reason = "rustc false positive for single line rstest fixtures" +)] +pub fn unused_port() -> SocketAddr { + let addr = SocketAddr::new(Ipv4Addr::LOCALHOST.into(), 0); + let listener = TcpListener::bind(addr).expect("failed to bind port"); + listener.local_addr().expect("failed to obtain local addr") +} diff --git a/tests/preamble.rs b/tests/preamble.rs index 79d5177f..fec9884f 100644 --- a/tests/preamble.rs +++ b/tests/preamble.rs @@ -4,7 +4,9 @@ use std::io; use bincode::error::DecodeError; use futures::future::BoxFuture; -use rstest::{fixture, rstest}; +mod common; +use common::{factory, unused_port}; +use rstest::rstest; use tokio::{ io::{AsyncReadExt, AsyncWriteExt, duplex}, net::TcpStream, @@ -34,11 +36,6 @@ impl HotlinePreamble { } } -#[fixture] -fn factory() -> impl Fn() -> WireframeApp + Send + Sync + Clone + 'static { - || WireframeApp::new().expect("WireframeApp::new failed") -} - /// Create a server configured with `HotlinePreamble` handlers. fn server_with_handlers( factory: F, @@ -68,9 +65,7 @@ where Fut: std::future::Future, B: FnOnce(std::net::SocketAddr) -> Fut, { - let server = server - .bind("127.0.0.1:0".parse().expect("hard-coded socket addr")) - .expect("bind"); + let server = server.bind(unused_port()).expect("bind"); let addr = server.local_addr().expect("addr"); let (shutdown_tx, shutdown_rx) = oneshot::channel::<()>(); let handle = tokio::spawn(async move { diff --git a/tests/server.rs b/tests/server.rs index 14559199..e0d59651 100644 --- a/tests/server.rs +++ b/tests/server.rs @@ -1,31 +1,31 @@ //! Tests for [`WireframeServer`] configuration. -use wireframe::{app::WireframeApp, server::WireframeServer}; +mod common; +use common::{factory, unused_port}; +use wireframe::server::WireframeServer; #[test] fn default_worker_count_matches_cpu_count() { - let server = WireframeServer::new(|| WireframeApp::new().expect("WireframeApp::new failed")); + let server = WireframeServer::new(factory()); let expected = std::thread::available_parallelism().map_or(1, std::num::NonZeroUsize::get); assert_eq!(server.worker_count(), expected); } #[test] fn default_workers_at_least_one() { - let server = WireframeServer::new(|| WireframeApp::new().expect("WireframeApp::new failed")); + let server = WireframeServer::new(factory()); assert!(server.worker_count() >= 1); } #[test] fn workers_method_enforces_minimum() { - let server = - WireframeServer::new(|| WireframeApp::new().expect("WireframeApp::new failed")).workers(0); + let server = WireframeServer::new(factory()).workers(0); assert_eq!(server.worker_count(), 1); } #[test] fn workers_accepts_large_values() { - let server = WireframeServer::new(|| WireframeApp::new().expect("WireframeApp::new failed")) - .workers(128); + let server = WireframeServer::new(factory()).workers(128); assert_eq!(server.worker_count(), 128); } @@ -39,15 +39,10 @@ async fn readiness_receiver_dropped() { time::{Duration, sleep}, }; - let factory = || WireframeApp::new().expect("WireframeApp::new failed"); - let server = WireframeServer::new(factory) + let server = WireframeServer::new(factory()) .workers(1) - .bind( - "127.0.0.1:0" - .parse() - .expect("hard-coded socket address must be valid"), - ) - .expect("bind failed"); + .bind(unused_port()) + .unwrap(); let addr = server.local_addr().expect("local addr missing"); // Create channel and immediately drop receiver to force send failure diff --git a/tests/world.rs b/tests/world.rs index f1a22e98..dcf79005 100644 --- a/tests/world.rs +++ b/tests/world.rs @@ -9,6 +9,10 @@ use cucumber::World; use tokio::{net::TcpStream, sync::oneshot}; use wireframe::{app::WireframeApp, server::WireframeServer}; +#[path = "common/mod.rs"] +mod common; +use common::unused_port; + #[derive(Debug)] struct PanicServer { addr: SocketAddr, @@ -26,7 +30,7 @@ impl PanicServer { }; let server = WireframeServer::new(factory) .workers(1) - .bind("127.0.0.1:0".parse().expect("Failed to parse address")) + .bind(unused_port()) .expect("bind"); let addr = server.local_addr().expect("Failed to get server address"); From 681f080d9dd3f96b3cb8af013066bf4d629a89b3 Mon Sep 17 00:00:00 2001 From: Leynos Date: Fri, 1 Aug 2025 23:11:10 +0100 Subject: [PATCH 2/3] Add bind_listener and fix port helper --- src/connection.rs | 32 ++++++++++++++++++++++++-------- src/push.rs | 8 ++++++-- src/server.rs | 15 ++++++++++++++- tests/common/mod.rs | 14 ++++++++++---- tests/preamble.rs | 6 ++++-- tests/server.rs | 6 ++++-- tests/world.rs | 10 ++++++---- 7 files changed, 68 insertions(+), 23 deletions(-) diff --git a/src/connection.rs b/src/connection.rs index ae5c3b8d..eb43546b 100644 --- a/src/connection.rs +++ b/src/connection.rs @@ -43,7 +43,9 @@ impl Drop for ActiveConnection { /// Return the current number of active connections. #[must_use] -pub fn active_connection_count() -> u64 { ACTIVE_CONNECTIONS.load(Ordering::Relaxed) } +pub fn active_connection_count() -> u64 { + ACTIVE_CONNECTIONS.load(Ordering::Relaxed) +} use crate::{ fairness::Fairness, @@ -188,11 +190,15 @@ where pub fn set_fairness(&mut self, fairness: FairnessConfig) { self.fairness.set_config(fairness); } /// Set or replace the current streaming response. - pub fn set_response(&mut self, stream: Option>) { self.response = stream; } + pub fn set_response(&mut self, stream: Option>) { + self.response = stream; + } /// Get a clone of the shutdown token used by the actor. #[must_use] - pub fn shutdown_token(&self) -> CancellationToken { self.shutdown.clone() } + pub fn shutdown_token(&self) -> CancellationToken { + self.shutdown.clone() + } /// Drive the actor until all sources are exhausted or shutdown is triggered. /// @@ -452,11 +458,15 @@ where /// Await cancellation on the provided shutdown token. #[inline] - async fn wait_shutdown(token: CancellationToken) { token.cancelled_owned().await; } + async fn wait_shutdown(token: CancellationToken) { + token.cancelled_owned().await; + } /// Receive the next frame from a push queue. #[inline] - async fn recv_push(rx: &mut mpsc::Receiver) -> Option { rx.recv().await } + async fn recv_push(rx: &mut mpsc::Receiver) -> Option { + rx.recv().await + } /// Poll `f` if `opt` is `Some`, returning `None` otherwise. #[expect( @@ -539,11 +549,17 @@ impl ActorState { } /// Returns `true` while the actor is actively processing sources. - fn is_active(&self) -> bool { matches!(self.run_state, RunState::Active) } + fn is_active(&self) -> bool { + matches!(self.run_state, RunState::Active) + } /// Returns `true` once shutdown has begun. - fn is_shutting_down(&self) -> bool { matches!(self.run_state, RunState::ShuttingDown) } + fn is_shutting_down(&self) -> bool { + matches!(self.run_state, RunState::ShuttingDown) + } /// Returns `true` when all sources have finished. - fn is_done(&self) -> bool { matches!(self.run_state, RunState::Finished) } + fn is_done(&self) -> bool { + matches!(self.run_state, RunState::Finished) + } } diff --git a/src/push.rs b/src/push.rs index b0577103..e2a7633e 100644 --- a/src/push.rs +++ b/src/push.rs @@ -106,7 +106,9 @@ pub(crate) struct PushHandleInner { pub struct PushHandle(Arc>); impl PushHandle { - pub(crate) fn from_arc(arc: Arc>) -> Self { Self(arc) } + pub(crate) fn from_arc(arc: Arc>) -> Self { + Self(arc) + } /// Internal helper to push a frame with the requested priority. /// @@ -262,7 +264,9 @@ impl PushHandle { } /// Downgrade to a `Weak` reference for storage in a registry. - pub(crate) fn downgrade(&self) -> Weak> { Arc::downgrade(&self.0) } + pub(crate) fn downgrade(&self) -> Weak> { + Arc::downgrade(&self.0) + } } /// Receiver ends of the push queues stored by the connection actor. diff --git a/src/server.rs b/src/server.rs index e5b1d4b8..41eee9bb 100644 --- a/src/server.rs +++ b/src/server.rs @@ -229,7 +229,9 @@ where /// ``` #[inline] #[must_use] - pub const fn worker_count(&self) -> usize { self.workers } + pub const fn worker_count(&self) -> usize { + self.workers + } /// Get the socket address the server is bound to, if available. #[must_use] @@ -274,6 +276,17 @@ where Ok(self) } + /// Bind the server to an existing standard TCP listener. + /// + /// # Errors + /// Returns an [`io::Error`] if configuring the listener fails. + pub fn bind_listener(mut self, listener: StdTcpListener) -> io::Result { + listener.set_nonblocking(true)?; + let listener = TcpListener::from_std(listener)?; + self.listener = Some(Arc::new(listener)); + Ok(self) + } + /// Run the server until a shutdown signal is received. /// /// Each worker accepts connections concurrently and would diff --git a/tests/common/mod.rs b/tests/common/mod.rs index d58ec853..51f04b8b 100644 --- a/tests/common/mod.rs +++ b/tests/common/mod.rs @@ -3,7 +3,13 @@ //! Provides fixtures for a basic [`WireframeApp`] factory and an unused //! local port. These helpers reduce duplication across test modules. -use std::net::{Ipv4Addr, SocketAddr, TcpListener}; +use std::net::{Ipv4Addr, SocketAddr, TcpListener as StdTcpListener}; + +/// Create a TCP listener bound to a free local port. +pub fn unused_listener() -> StdTcpListener { + let addr = SocketAddr::new(Ipv4Addr::LOCALHOST.into(), 0); + StdTcpListener::bind(addr).expect("failed to bind port") +} use rstest::fixture; use wireframe::app::WireframeApp; @@ -23,7 +29,7 @@ pub fn factory() -> impl Fn() -> WireframeApp + Send + Sync + Clone + 'static { reason = "rustc false positive for single line rstest fixtures" )] pub fn unused_port() -> SocketAddr { - let addr = SocketAddr::new(Ipv4Addr::LOCALHOST.into(), 0); - let listener = TcpListener::bind(addr).expect("failed to bind port"); - listener.local_addr().expect("failed to obtain local addr") + unused_listener() + .local_addr() + .expect("failed to obtain local addr") } diff --git a/tests/preamble.rs b/tests/preamble.rs index fec9884f..6eb315d3 100644 --- a/tests/preamble.rs +++ b/tests/preamble.rs @@ -5,7 +5,7 @@ use std::io; use bincode::error::DecodeError; use futures::future::BoxFuture; mod common; -use common::{factory, unused_port}; +use common::{factory, unused_listener}; use rstest::rstest; use tokio::{ io::{AsyncReadExt, AsyncWriteExt, duplex}, @@ -65,7 +65,9 @@ where Fut: std::future::Future, B: FnOnce(std::net::SocketAddr) -> Fut, { - let server = server.bind(unused_port()).expect("bind"); + let listener = unused_listener(); + let _addr = listener.local_addr().expect("addr"); + let server = server.bind_listener(listener).expect("bind"); let addr = server.local_addr().expect("addr"); let (shutdown_tx, shutdown_rx) = oneshot::channel::<()>(); let handle = tokio::spawn(async move { diff --git a/tests/server.rs b/tests/server.rs index e0d59651..daa13cdc 100644 --- a/tests/server.rs +++ b/tests/server.rs @@ -1,7 +1,7 @@ //! Tests for [`WireframeServer`] configuration. mod common; -use common::{factory, unused_port}; +use common::{factory, unused_listener}; use wireframe::server::WireframeServer; #[test] @@ -39,9 +39,11 @@ async fn readiness_receiver_dropped() { time::{Duration, sleep}, }; + let listener = unused_listener(); + let _addr = listener.local_addr().unwrap(); let server = WireframeServer::new(factory()) .workers(1) - .bind(unused_port()) + .bind_listener(listener) .unwrap(); let addr = server.local_addr().expect("local addr missing"); diff --git a/tests/world.rs b/tests/world.rs index dcf79005..4c940cc4 100644 --- a/tests/world.rs +++ b/tests/world.rs @@ -11,7 +11,7 @@ use wireframe::{app::WireframeApp, server::WireframeServer}; #[path = "common/mod.rs"] mod common; -use common::unused_port; +use common::unused_listener; #[derive(Debug)] struct PanicServer { @@ -28,11 +28,11 @@ impl PanicServer { .on_connection_setup(|| async { panic!("boom") }) .expect("Failed to set connection setup callback") }; + let listener = unused_listener(); let server = WireframeServer::new(factory) .workers(1) - .bind(unused_port()) + .bind_listener(listener) .expect("bind"); - let addr = server.local_addr().expect("Failed to get server address"); let (tx_shutdown, rx_shutdown) = oneshot::channel(); let (tx_ready, rx_ready) = oneshot::channel(); @@ -84,7 +84,9 @@ impl PanicWorld { /// /// # Panics /// Panics if binding the server fails or the server task fails. - pub async fn start_panic_server(&mut self) { self.server.replace(PanicServer::spawn().await); } + pub async fn start_panic_server(&mut self) { + self.server.replace(PanicServer::spawn().await); + } /// Connect to the running server once. /// From 802519ae8f7ced4f7ffea23d461234f3c0508c74 Mon Sep 17 00:00:00 2001 From: Leynos Date: Sun, 3 Aug 2025 18:06:15 +0100 Subject: [PATCH 3/3] Format connection utilities --- src/connection.rs | 32 ++++++++------------------------ src/push.rs | 8 ++------ src/server.rs | 4 +--- tests/world.rs | 4 +--- 4 files changed, 12 insertions(+), 36 deletions(-) diff --git a/src/connection.rs b/src/connection.rs index eb43546b..ae5c3b8d 100644 --- a/src/connection.rs +++ b/src/connection.rs @@ -43,9 +43,7 @@ impl Drop for ActiveConnection { /// Return the current number of active connections. #[must_use] -pub fn active_connection_count() -> u64 { - ACTIVE_CONNECTIONS.load(Ordering::Relaxed) -} +pub fn active_connection_count() -> u64 { ACTIVE_CONNECTIONS.load(Ordering::Relaxed) } use crate::{ fairness::Fairness, @@ -190,15 +188,11 @@ where pub fn set_fairness(&mut self, fairness: FairnessConfig) { self.fairness.set_config(fairness); } /// Set or replace the current streaming response. - pub fn set_response(&mut self, stream: Option>) { - self.response = stream; - } + pub fn set_response(&mut self, stream: Option>) { self.response = stream; } /// Get a clone of the shutdown token used by the actor. #[must_use] - pub fn shutdown_token(&self) -> CancellationToken { - self.shutdown.clone() - } + pub fn shutdown_token(&self) -> CancellationToken { self.shutdown.clone() } /// Drive the actor until all sources are exhausted or shutdown is triggered. /// @@ -458,15 +452,11 @@ where /// Await cancellation on the provided shutdown token. #[inline] - async fn wait_shutdown(token: CancellationToken) { - token.cancelled_owned().await; - } + async fn wait_shutdown(token: CancellationToken) { token.cancelled_owned().await; } /// Receive the next frame from a push queue. #[inline] - async fn recv_push(rx: &mut mpsc::Receiver) -> Option { - rx.recv().await - } + async fn recv_push(rx: &mut mpsc::Receiver) -> Option { rx.recv().await } /// Poll `f` if `opt` is `Some`, returning `None` otherwise. #[expect( @@ -549,17 +539,11 @@ impl ActorState { } /// Returns `true` while the actor is actively processing sources. - fn is_active(&self) -> bool { - matches!(self.run_state, RunState::Active) - } + fn is_active(&self) -> bool { matches!(self.run_state, RunState::Active) } /// Returns `true` once shutdown has begun. - fn is_shutting_down(&self) -> bool { - matches!(self.run_state, RunState::ShuttingDown) - } + fn is_shutting_down(&self) -> bool { matches!(self.run_state, RunState::ShuttingDown) } /// Returns `true` when all sources have finished. - fn is_done(&self) -> bool { - matches!(self.run_state, RunState::Finished) - } + fn is_done(&self) -> bool { matches!(self.run_state, RunState::Finished) } } diff --git a/src/push.rs b/src/push.rs index e2a7633e..b0577103 100644 --- a/src/push.rs +++ b/src/push.rs @@ -106,9 +106,7 @@ pub(crate) struct PushHandleInner { pub struct PushHandle(Arc>); impl PushHandle { - pub(crate) fn from_arc(arc: Arc>) -> Self { - Self(arc) - } + pub(crate) fn from_arc(arc: Arc>) -> Self { Self(arc) } /// Internal helper to push a frame with the requested priority. /// @@ -264,9 +262,7 @@ impl PushHandle { } /// Downgrade to a `Weak` reference for storage in a registry. - pub(crate) fn downgrade(&self) -> Weak> { - Arc::downgrade(&self.0) - } + pub(crate) fn downgrade(&self) -> Weak> { Arc::downgrade(&self.0) } } /// Receiver ends of the push queues stored by the connection actor. diff --git a/src/server.rs b/src/server.rs index 41eee9bb..dbc650f5 100644 --- a/src/server.rs +++ b/src/server.rs @@ -229,9 +229,7 @@ where /// ``` #[inline] #[must_use] - pub const fn worker_count(&self) -> usize { - self.workers - } + pub const fn worker_count(&self) -> usize { self.workers } /// Get the socket address the server is bound to, if available. #[must_use] diff --git a/tests/world.rs b/tests/world.rs index 4c940cc4..e1814277 100644 --- a/tests/world.rs +++ b/tests/world.rs @@ -84,9 +84,7 @@ impl PanicWorld { /// /// # Panics /// Panics if binding the server fails or the server task fails. - pub async fn start_panic_server(&mut self) { - self.server.replace(PanicServer::spawn().await); - } + pub async fn start_panic_server(&mut self) { self.server.replace(PanicServer::spawn().await); } /// Connect to the running server once. ///