From cab501c92165bd64d707f1d905bb859d9c1fc9d3 Mon Sep 17 00:00:00 2001 From: Leynos Date: Fri, 13 Jun 2025 15:39:00 +0100 Subject: [PATCH 1/4] Restore worker count guard --- README.md | 2 ++ docs/rust-binary-router-library-design.md | 3 ++- src/extractor.rs | 33 +++++++++++++++++++++++ src/server.rs | 30 ++++++++++++++++++--- tests/server.rs | 13 +++++++++ 5 files changed, 77 insertions(+), 4 deletions(-) create mode 100644 tests/server.rs diff --git a/README.md b/README.md index 6b5abce5..0b8fb79c 100644 --- a/README.md +++ b/README.md @@ -40,6 +40,8 @@ WireframeServer::new(|| { .await ``` +By default, the number of worker tasks equals the number of CPU cores. + The builder supports methods like `frame_processor`, `route`, `app_data`, and `wrap` for middleware configuration【F:docs/rust-binary-router-library-design.md†L616-L704】. diff --git a/docs/rust-binary-router-library-design.md b/docs/rust-binary-router-library-design.md index 259408a2..74d3a9df 100644 --- a/docs/rust-binary-router-library-design.md +++ b/docs/rust-binary-router-library-design.md @@ -669,7 +669,8 @@ similar to Actix Web's web::Data.21 `HttpServer`) would take the configured `WireframeApp` factory (a closure that creates an `App` instance per worker thread), bind to a network address, and manage incoming connections, task spawning for each connection, and the - overall server lifecycle. This would likely be built on Tokio's networking and + overall server lifecycle. The default number of worker tasks matches the + available CPU cores. This would likely be built on Tokio's networking and runtime primitives.18 This structural similarity to Actix Web is intentional. Developers familiar with diff --git a/src/extractor.rs b/src/extractor.rs index dcf0552b..5e8c5b0a 100644 --- a/src/extractor.rs +++ b/src/extractor.rs @@ -69,6 +69,17 @@ impl SharedState { /// assert_eq!(*state, 5); /// ``` #[must_use] + /// Creates a new `SharedState` instance wrapping the provided `Arc`. + /// + /// # Examples + /// + /// ``` + /// use std::sync::Arc; + /// use wireframe::extractor::SharedState; + /// let state = Arc::new(42); + /// let shared = SharedState::new(state.clone()); + /// assert_eq!(*shared, 42); + /// ``` pub fn new(inner: Arc) -> Self { Self(inner) } @@ -128,3 +139,25 @@ impl std::ops::Deref for SharedState { &self.0 } } + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn advance_consumes_bytes() { + let mut payload = Payload { data: b"hello" }; + payload.advance(2); + assert_eq!(payload.data, b"llo"); + payload.advance(10); + assert!(payload.data.is_empty()); + } + + #[test] + fn remaining_reports_length() { + let mut payload = Payload { data: b"abc" }; + assert_eq!(payload.remaining(), 3); + payload.advance(1); + assert_eq!(payload.remaining(), 2); + } +} diff --git a/src/server.rs b/src/server.rs index 2123b9e5..f9f0b7b9 100644 --- a/src/server.rs +++ b/src/server.rs @@ -28,23 +28,47 @@ impl WireframeServer where F: Fn() -> WireframeApp + Send + Sync + Clone + 'static, { - /// Construct a new server using the supplied application factory. + /// Constructs a new `WireframeServer` using the provided application factory + /// closure. + /// + /// The default worker count equals the number of CPU cores. + /// + /// ```no_run + /// use wireframe::{app::WireframeApp, server::WireframeServer}; + /// + /// let factory = || WireframeApp::new().unwrap(); + /// let server = WireframeServer::new(factory); + /// ``` #[must_use] pub fn new(factory: F) -> Self { Self { factory, listener: None, - workers: num_cpus::get(), + workers: num_cpus::get().max(1), } } - /// Set the number of worker tasks to spawn. + /// Set the number of worker tasks to spawn for the server. + /// + /// Ensures at least one worker is configured. + /// + /// # Examples + /// + /// ```ignore + /// let server = WireframeServer::new(factory).workers(4); + /// ``` #[must_use] pub fn workers(mut self, count: usize) -> Self { self.workers = count.max(1); self } + /// Get the configured worker count. + #[must_use] + pub const fn worker_count(&self) -> usize { + self.workers + } + /// Bind the server to the given address and create a listener. /// /// # Errors diff --git a/tests/server.rs b/tests/server.rs new file mode 100644 index 00000000..172016c1 --- /dev/null +++ b/tests/server.rs @@ -0,0 +1,13 @@ +use wireframe::{app::WireframeApp, server::WireframeServer}; + +#[test] +fn default_worker_count_is_positive() { + let server = WireframeServer::new(|| WireframeApp::new().unwrap()); + assert!(server.worker_count() >= 1); +} + +#[test] +fn workers_method_enforces_minimum() { + let server = WireframeServer::new(|| WireframeApp::new().unwrap()).workers(0); + assert_eq!(server.worker_count(), 1); +} From af3b7a0a07fde1bb56643af4aca32aa049c0b27e Mon Sep 17 00:00:00 2001 From: Leynos Date: Fri, 13 Jun 2025 18:54:17 +0100 Subject: [PATCH 2/4] Use available_parallelism for worker default --- Cargo.lock | 21 ++------------------- Cargo.toml | 1 - src/middleware.rs | 2 +- src/server.rs | 7 ++++++- tests/server.rs | 17 +++++++++++++---- 5 files changed, 22 insertions(+), 26 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index f2310555..a80969b2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -176,12 +176,6 @@ version = "0.31.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "07e28edb80900c19c28f1072f2e8aeca7fa06b23cd4169cefe1af5aa3260783f" -[[package]] -name = "hermit-abi" -version = "0.5.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fc0fef456e4baa96da950455cd02c081ca953b141298e41db3fc7e36b1da849c" - [[package]] name = "libc" version = "0.2.172" @@ -214,16 +208,6 @@ dependencies = [ "windows-sys 0.59.0", ] -[[package]] -name = "num_cpus" -version = "1.17.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "91df4bbde75afed763b708b7eee1e8e7651e02d97f6d5dd763e89367e957b23b" -dependencies = [ - "hermit-abi", - "libc", -] - [[package]] name = "object" version = "0.36.7" @@ -319,9 +303,9 @@ dependencies = [ [[package]] name = "syn" -version = "2.0.102" +version = "2.0.103" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f6397daf94fa90f058bd0fd88429dd9e5738999cca8d701813c80723add80462" +checksum = "e4307e30089d6fd6aff212f2da3a1f9e32f3223b1f010fb09b7c95f90f3ca1e8" dependencies = [ "proc-macro2", "quote", @@ -469,7 +453,6 @@ dependencies = [ "bincode", "bytes", "futures", - "num_cpus", "serde", "tokio", ] diff --git a/Cargo.toml b/Cargo.toml index 50b2132b..ce7ff1e9 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -8,7 +8,6 @@ serde = { version = "1", features = ["derive"] } bincode = "2" tokio = { version = "1", default-features = false, features = ["net", "signal", "rt-multi-thread", "macros", "sync", "time"] } futures = "0.3" -num_cpus = "^1" async-trait = "0.1" bytes = "1" diff --git a/src/middleware.rs b/src/middleware.rs index 113daf03..3b561f54 100644 --- a/src/middleware.rs +++ b/src/middleware.rs @@ -31,7 +31,7 @@ where /// /// # Errors /// - /// Propagates any error produced by the wrapped service. + /// Returns an error from the wrapped service if handling the request fails. #[must_use = "await the returned future"] pub async fn call(&self, req: ServiceRequest) -> Result { self.service.call(req).await diff --git a/src/server.rs b/src/server.rs index f9f0b7b9..b412135f 100644 --- a/src/server.rs +++ b/src/server.rs @@ -33,6 +33,10 @@ where /// /// The default worker count equals the number of CPU cores. /// + /// # Panics + /// + /// Panics if the number of available CPUs cannot be determined. + /// /// ```no_run /// use wireframe::{app::WireframeApp, server::WireframeServer}; /// @@ -44,7 +48,7 @@ where Self { factory, listener: None, - workers: num_cpus::get().max(1), + workers: std::thread::available_parallelism().unwrap().get(), } } @@ -64,6 +68,7 @@ where } /// Get the configured worker count. + #[inline] #[must_use] pub const fn worker_count(&self) -> usize { self.workers diff --git a/tests/server.rs b/tests/server.rs index 172016c1..6aa8e7ee 100644 --- a/tests/server.rs +++ b/tests/server.rs @@ -1,13 +1,22 @@ use wireframe::{app::WireframeApp, server::WireframeServer}; #[test] -fn default_worker_count_is_positive() { - let server = WireframeServer::new(|| WireframeApp::new().unwrap()); - assert!(server.worker_count() >= 1); +fn default_worker_count_matches_cpu_count() { + let server = WireframeServer::new(|| WireframeApp::new().expect("WireframeApp::new failed")); + let expected = std::thread::available_parallelism().unwrap().get(); + assert_eq!(server.worker_count(), expected); } #[test] fn workers_method_enforces_minimum() { - let server = WireframeServer::new(|| WireframeApp::new().unwrap()).workers(0); + let server = + WireframeServer::new(|| WireframeApp::new().expect("WireframeApp::new failed")).workers(0); assert_eq!(server.worker_count(), 1); } + +#[test] +fn workers_accepts_large_values() { + let server = WireframeServer::new(|| WireframeApp::new().expect("WireframeApp::new failed")) + .workers(128); + assert_eq!(server.worker_count(), 128); +} From 5436e1815bbcd9a70aa0b7f7c1d38c56bd99b154 Mon Sep 17 00:00:00 2001 From: Leynos Date: Fri, 13 Jun 2025 19:19:00 +0100 Subject: [PATCH 3/4] Handle unavailable CPU count --- README.md | 3 ++- docs/rust-binary-router-library-design.md | 3 ++- src/server.rs | 8 ++++---- tests/server.rs | 2 +- 4 files changed, 9 insertions(+), 7 deletions(-) diff --git a/README.md b/README.md index 0b8fb79c..cd6fea42 100644 --- a/README.md +++ b/README.md @@ -40,7 +40,8 @@ WireframeServer::new(|| { .await ``` -By default, the number of worker tasks equals the number of CPU cores. +By default, the number of worker tasks equals the number of CPU cores. If the +CPU count cannot be determined, the server falls back to a single worker. The builder supports methods like `frame_processor`, `route`, `app_data`, and `wrap` for middleware configuration【F:docs/rust-binary-router-library-design.md†L616-L704】. diff --git a/docs/rust-binary-router-library-design.md b/docs/rust-binary-router-library-design.md index 74d3a9df..6a420c40 100644 --- a/docs/rust-binary-router-library-design.md +++ b/docs/rust-binary-router-library-design.md @@ -670,7 +670,8 @@ similar to Actix Web's web::Data.21 creates an `App` instance per worker thread), bind to a network address, and manage incoming connections, task spawning for each connection, and the overall server lifecycle. The default number of worker tasks matches the - available CPU cores. This would likely be built on Tokio's networking and + available CPU cores, falling back to a single worker if the count cannot be + determined. This would likely be built on Tokio's networking and runtime primitives.18 This structural similarity to Actix Web is intentional. Developers familiar with diff --git a/src/server.rs b/src/server.rs index b412135f..bb68499b 100644 --- a/src/server.rs +++ b/src/server.rs @@ -33,9 +33,8 @@ where /// /// The default worker count equals the number of CPU cores. /// - /// # Panics - /// - /// Panics if the number of available CPUs cannot be determined. + /// If the CPU count cannot be determined, the server defaults to a single + /// worker. /// /// ```no_run /// use wireframe::{app::WireframeApp, server::WireframeServer}; @@ -45,10 +44,11 @@ where /// ``` #[must_use] pub fn new(factory: F) -> Self { + let workers = std::thread::available_parallelism().map_or(1, std::num::NonZeroUsize::get); Self { factory, listener: None, - workers: std::thread::available_parallelism().unwrap().get(), + workers, } } diff --git a/tests/server.rs b/tests/server.rs index 6aa8e7ee..a1917623 100644 --- a/tests/server.rs +++ b/tests/server.rs @@ -3,7 +3,7 @@ use wireframe::{app::WireframeApp, server::WireframeServer}; #[test] fn default_worker_count_matches_cpu_count() { let server = WireframeServer::new(|| WireframeApp::new().expect("WireframeApp::new failed")); - let expected = std::thread::available_parallelism().unwrap().get(); + let expected = std::thread::available_parallelism().map_or(1, std::num::NonZeroUsize::get); assert_eq!(server.worker_count(), expected); } From e9222a4503902180d55c7a1ce2182fecdb9fdb1d Mon Sep 17 00:00:00 2001 From: "coderabbitai[bot]" <136622811+coderabbitai[bot]@users.noreply.github.com> Date: Fri, 13 Jun 2025 19:45:06 +0100 Subject: [PATCH 4/4] =?UTF-8?q?=F0=9F=93=9D=20Add=20docstrings=20to=20`cod?= =?UTF-8?q?ex/ensure-minimum-of-one-worker-in-wireframeserver`=20(#19)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Docstrings generation was requested by @leynos. * https://github.com/leynos/wireframe/pull/16#issuecomment-2970615031 The following files were modified: * `src/extractor.rs` * `src/middleware.rs` * `src/server.rs` Co-authored-by: coderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com> --- src/extractor.rs | 13 +++++++++++++ src/middleware.rs | 20 +++++++++++++++++++- src/server.rs | 33 +++++++++++++++++++++++++++++++++ 3 files changed, 65 insertions(+), 1 deletion(-) diff --git a/src/extractor.rs b/src/extractor.rs index 5e8c5b0a..d77d23bf 100644 --- a/src/extractor.rs +++ b/src/extractor.rs @@ -134,6 +134,19 @@ impl std::ops::Deref for SharedState { /// let state = Arc::new(42); /// let shared = SharedState::new(state.clone()); /// assert_eq!(*shared, 42); + /// Returns a reference to the inner shared state value. + /// + /// Allows transparent access to the wrapped state as if it were a reference to the underlying type. + /// + /// # Examples + /// + /// ``` + /// use std::sync::Arc; + /// use wireframe::extractor::SharedState; + /// + /// let state = Arc::new(42); + /// let shared = SharedState::new(state); + /// assert_eq!(*shared, 42); /// ``` fn deref(&self) -> &Self::Target { &self.0 diff --git a/src/middleware.rs b/src/middleware.rs index 3b561f54..a9e93eab 100644 --- a/src/middleware.rs +++ b/src/middleware.rs @@ -23,6 +23,22 @@ where /// Create a new [`Next`] wrapping the given service. #[inline] #[must_use] + /// Creates a new `Next` instance wrapping a reference to the given service. + /// + /// # Examples + /// + /// ``` + /// # use your_crate::{Next, Service, ServiceRequest}; + /// # struct MyService; + /// # impl Service for MyService { + /// # type Error = std::convert::Infallible; + /// # async fn call(&self, _req: ServiceRequest) -> Result { + /// # Ok(super::ServiceResponse) + /// # } + /// # } + /// let service = MyService; + /// let next = Next::new(&service); + /// ``` pub fn new(service: &'a S) -> Self { Self { service } } @@ -31,7 +47,9 @@ where /// /// # Errors /// - /// Returns an error from the wrapped service if handling the request fails. + /// Asynchronously invokes the wrapped service with the given request. + /// + /// Returns a response produced by the service, or an error if the service fails to handle the request. #[must_use = "await the returned future"] pub async fn call(&self, req: ServiceRequest) -> Result { self.service.call(req).await diff --git a/src/server.rs b/src/server.rs index bb68499b..b29679c7 100644 --- a/src/server.rs +++ b/src/server.rs @@ -41,6 +41,19 @@ where /// /// let factory = || WireframeApp::new().unwrap(); /// let server = WireframeServer::new(factory); + /// Creates a new `WireframeServer` with the provided factory closure. + /// + /// The server is initialised with a default worker count equal to the number of available CPU cores, or 1 if this cannot be determined. The TCP listener is unset and must be configured with `bind` before running the server. + /// + /// # Panics + /// + /// Panics if the number of available CPUs cannot be determined and the fallback to 1 fails. + /// + /// # Examples + /// + /// ``` + /// let server = WireframeServer::new(|| WireframeApp::default()); + /// assert!(server.worker_count() >= 1); /// ``` #[must_use] pub fn new(factory: F) -> Self { @@ -62,6 +75,18 @@ where /// let server = WireframeServer::new(factory).workers(4); /// ``` #[must_use] + /// Sets the number of worker tasks to spawn, ensuring at least one worker is configured. + /// + /// Returns a new `WireframeServer` instance with the updated worker count. If `count` is less than 1, it defaults to 1. + /// + /// # Examples + /// + /// ``` + /// let server = WireframeServer::new(factory).workers(4); + /// assert_eq!(server.worker_count(), 4); + /// let server = server.workers(0); + /// assert_eq!(server.worker_count(), 1); + /// ``` pub fn workers(mut self, count: usize) -> Self { self.workers = count.max(1); self @@ -70,6 +95,14 @@ where /// Get the configured worker count. #[inline] #[must_use] + /// Returns the configured number of worker tasks for the server. + /// + /// # Examples + /// + /// ``` + /// let server = WireframeServer::new(factory); + /// assert!(server.worker_count() >= 1); + /// ``` pub const fn worker_count(&self) -> usize { self.workers }