diff --git a/Cargo.lock b/Cargo.lock index 47952764..a78fc8f6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2,6 +2,42 @@ # It is not intended for manual editing. version = 4 +[[package]] +name = "addr2line" +version = "0.24.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dfbe277e56a376000877090da837660b4427aad530e3028d44e0bffe4f89a1c1" +dependencies = [ + "gimli", +] + +[[package]] +name = "adler2" +version = "2.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "320119579fcad9c21884f5c4861d16174d0e06250625266f50fe6898340abefa" + +[[package]] +name = "autocfg" +version = "1.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ace50bade8e6234aa140d9a2f552bbee1db4d353f69b8217bc503490fc1a9f26" + +[[package]] +name = "backtrace" +version = "0.3.75" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6806a6321ec58106fea15becdad98371e28d92ccbc7c8f1b3b6dd724fe8f1002" +dependencies = [ + "addr2line", + "cfg-if", + "libc", + "miniz_oxide", + "object", + "rustc-demangle", + "windows-targets", +] + [[package]] name = "bincode" version = "2.0.1" @@ -22,6 +58,176 @@ dependencies = [ "virtue", ] +[[package]] +name = "cfg-if" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9555578bc9e57714c812a1f84e4fc5b4d21fcb063490c624de019f7464c91268" + +[[package]] +name = "futures" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "65bc07b1a8bc7c85c5f2e110c476c7389b4554ba72af57d8445ea63a576b0876" +dependencies = [ + "futures-channel", + "futures-core", + "futures-executor", + "futures-io", + "futures-sink", + "futures-task", + "futures-util", +] + +[[package]] +name = "futures-channel" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2dff15bf788c671c1934e366d07e30c1814a8ef514e1af724a602e8a2fbe1b10" +dependencies = [ + "futures-core", + "futures-sink", +] + +[[package]] +name = "futures-core" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "05f29059c0c2090612e8d742178b0580d2dc940c837851ad723096f87af6663e" + +[[package]] +name = "futures-executor" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1e28d1d997f585e54aebc3f97d39e72338912123a67330d723fdbb564d646c9f" +dependencies = [ + "futures-core", + "futures-task", + "futures-util", +] + +[[package]] +name = "futures-io" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9e5c1b78ca4aae1ac06c48a526a655760685149f0d465d21f37abfe57ce075c6" + +[[package]] +name = "futures-macro" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "162ee34ebcb7c64a8abebc059ce0fee27c2262618d7b60ed8faf72fef13c3650" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "futures-sink" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e575fab7d1e0dcb8d0c7bcf9a63ee213816ab51902e6d244a95819acacf1d4f7" + +[[package]] +name = "futures-task" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f90f7dce0722e95104fcb095585910c0977252f286e354b5e3bd38902cd99988" + +[[package]] +name = "futures-util" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9fa08315bb612088cc391249efdc3bc77536f16c91f6cf495e6fbe85b20a4a81" +dependencies = [ + "futures-channel", + "futures-core", + "futures-io", + "futures-macro", + "futures-sink", + "futures-task", + "memchr", + "pin-project-lite", + "pin-utils", + "slab", +] + +[[package]] +name = "gimli" +version = "0.31.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "07e28edb80900c19c28f1072f2e8aeca7fa06b23cd4169cefe1af5aa3260783f" + +[[package]] +name = "hermit-abi" +version = "0.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fc0fef456e4baa96da950455cd02c081ca953b141298e41db3fc7e36b1da849c" + +[[package]] +name = "libc" +version = "0.2.172" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d750af042f7ef4f724306de029d18836c26c1765a54a6a3f094cbd23a7267ffa" + +[[package]] +name = "memchr" +version = "2.7.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "32a282da65faaf38286cf3be983213fcf1d2e2a58700e808f83f4ea9a4804bc0" + +[[package]] +name = "miniz_oxide" +version = "0.8.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1fa76a2c86f704bdb222d66965fb3d63269ce38518b83cb0575fca855ebb6316" +dependencies = [ + "adler2", +] + +[[package]] +name = "mio" +version = "1.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "78bed444cc8a2160f01cbcf811ef18cac863ad68ae8ca62092e8db51d51c761c" +dependencies = [ + "libc", + "wasi", + "windows-sys 0.59.0", +] + +[[package]] +name = "num_cpus" +version = "1.17.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "91df4bbde75afed763b708b7eee1e8e7651e02d97f6d5dd763e89367e957b23b" +dependencies = [ + "hermit-abi", + "libc", +] + +[[package]] +name = "object" +version = "0.36.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "62948e14d923ea95ea2c7c86c71013138b66525b86bdc08d2dcc262bdb497b87" +dependencies = [ + "memchr", +] + +[[package]] +name = "pin-project-lite" +version = "0.2.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3b3cff922bd51709b605d9ead9aa71031d81447142d828eb4a6eba76fe619f9b" + +[[package]] +name = "pin-utils" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" + [[package]] name = "proc-macro2" version = "1.0.95" @@ -40,6 +246,12 @@ dependencies = [ "proc-macro2", ] +[[package]] +name = "rustc-demangle" +version = "0.1.25" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "989e6739f80c4ad5b13e0fd7fe89531180375b18520cc8c82080e4dc4035b84f" + [[package]] name = "serde" version = "1.0.219" @@ -60,6 +272,34 @@ dependencies = [ "syn", ] +[[package]] +name = "signal-hook-registry" +version = "1.4.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9203b8055f63a2a00e2f593bb0510367fe707d7ff1e5c872de2f537b339e5410" +dependencies = [ + "libc", +] + +[[package]] +name = "slab" +version = "0.4.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8f92a496fb766b417c996b9c5e57daf2f7ad3b0bebe1ccfca4856390e3d3bb67" +dependencies = [ + "autocfg", +] + +[[package]] +name = "socket2" +version = "0.5.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e22376abed350d73dd1cd119b57ffccad95b4e585a7cda43e286245ce23c0678" +dependencies = [ + "libc", + "windows-sys 0.52.0", +] + [[package]] name = "syn" version = "2.0.102" @@ -71,6 +311,33 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "tokio" +version = "1.45.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "75ef51a33ef1da925cea3e4eb122833cb377c61439ca401b770f54902b806779" +dependencies = [ + "backtrace", + "libc", + "mio", + "pin-project-lite", + "signal-hook-registry", + "socket2", + "tokio-macros", + "windows-sys 0.52.0", +] + +[[package]] +name = "tokio-macros" +version = "2.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6e06d43f1345a3bcd39f6a56dbb7dcab2ba47e68e8ac134855e7e2bdbaf8cab8" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "unicode-ident" version = "1.0.18" @@ -89,10 +356,101 @@ version = "0.0.18" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "051eb1abcf10076295e815102942cc58f9d5e3b4560e46e53c21e8ff6f3af7b1" +[[package]] +name = "wasi" +version = "0.11.1+wasi-snapshot-preview1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ccf3ec651a847eb01de73ccad15eb7d99f80485de043efb2f370cd654f4ea44b" + +[[package]] +name = "windows-sys" +version = "0.52.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "282be5f36a8ce781fad8c8ae18fa3f9beff57ec1b52cb3de0789201425d9a33d" +dependencies = [ + "windows-targets", +] + +[[package]] +name = "windows-sys" +version = "0.59.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1e38bc4d79ed67fd075bcc251a1c39b32a1776bbe92e5bef1f0bf1f8c531853b" +dependencies = [ + "windows-targets", +] + +[[package]] +name = "windows-targets" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9b724f72796e036ab90c1021d4780d4d3d648aca59e491e6b98e725b84e99973" +dependencies = [ + "windows_aarch64_gnullvm", + "windows_aarch64_msvc", + "windows_i686_gnu", + "windows_i686_gnullvm", + "windows_i686_msvc", + "windows_x86_64_gnu", + "windows_x86_64_gnullvm", + "windows_x86_64_msvc", +] + +[[package]] +name = "windows_aarch64_gnullvm" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "32a4622180e7a0ec044bb555404c800bc9fd9ec262ec147edd5989ccd0c02cd3" + +[[package]] +name = "windows_aarch64_msvc" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "09ec2a7bb152e2252b53fa7803150007879548bc709c039df7627cabbd05d469" + +[[package]] +name = "windows_i686_gnu" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8e9b5ad5ab802e97eb8e295ac6720e509ee4c243f69d781394014ebfe8bbfa0b" + +[[package]] +name = "windows_i686_gnullvm" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0eee52d38c090b3caa76c563b86c3a4bd71ef1a819287c19d586d7334ae8ed66" + +[[package]] +name = "windows_i686_msvc" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "240948bc05c5e7c6dabba28bf89d89ffce3e303022809e73deaefe4f6ec56c66" + +[[package]] +name = "windows_x86_64_gnu" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "147a5c80aabfbf0c7d901cb5895d1de30ef2907eb21fbbab29ca94c5b08b1a78" + +[[package]] +name = "windows_x86_64_gnullvm" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "24d5b23dc417412679681396f2b49f3de8c1473deb516bd34410872eff51ed0d" + +[[package]] +name = "windows_x86_64_msvc" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "589f6da84c646204747d1270a2a5661ea66ed1cced2631d546fdfb155959f9ec" + [[package]] name = "wireframe" version = "0.1.0" dependencies = [ "bincode", + "futures", + "num_cpus", "serde", + "tokio", ] diff --git a/Cargo.toml b/Cargo.toml index af89b76f..0f795c61 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -6,6 +6,9 @@ edition = "2024" [dependencies] serde = { version = "1", features = ["derive"] } bincode = "2" +tokio = { version = "1", default-features = false, features = ["net", "signal", "rt-multi-thread", "macros", "sync", "time"] } +futures = "0.3" +num_cpus = "^1" [lints.clippy] pedantic = "warn" diff --git a/docs/roadmap.md b/docs/roadmap.md index 24c4a9e6..3c943a5e 100644 --- a/docs/roadmap.md +++ b/docs/roadmap.md @@ -17,9 +17,11 @@ after formatting. Line numbers below refer to that file. Clarify method signatures (`new`, `route`, `service`, `wrap`), expose a consistent `Result` error strategy, and allow registration calls in any order for ergonomic chaining. - - [ ] Implement `WireframeServer`. - Outline how worker threads are spawned via Tokio, with graceful - shutdown using a signal and per-worker `WireframeApp` instances. + - [x] Implement `WireframeServer`. + Worker tasks are spawned using Tokio. Each thread receives its own + `WireframeApp` instance from a factory closure. A Ctrl+C signal triggers + graceful shutdown, notifying all workers to stop accepting new + connections. - [ ] Standardise supporting trait definitions. Provide naming conventions and generic bounds for the `FrameProcessor` trait, state extractors and middleware via diff --git a/src/lib.rs b/src/lib.rs index 6bb259bf..9c88284c 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,2 +1,3 @@ pub mod app; pub mod message; +pub mod server; diff --git a/src/server.rs b/src/server.rs new file mode 100644 index 00000000..6ac1fa3c --- /dev/null +++ b/src/server.rs @@ -0,0 +1,187 @@ +use std::io; +use std::net::{SocketAddr, TcpListener as StdTcpListener}; +use std::sync::Arc; + +use tokio::net::TcpListener; +use tokio::sync::broadcast; +use tokio::time::{Duration, sleep}; + +use crate::app::WireframeApp; + +/// Tokio-based server for `WireframeApp` instances. +/// +/// `WireframeServer` spawns a worker task per thread. Each worker +/// receives its own `WireframeApp` from the provided factory +/// closure. The server listens for a shutdown signal using +/// `tokio::signal::ctrl_c` and notifies all workers to stop +/// accepting new connections. +pub struct WireframeServer +where + F: Fn() -> WireframeApp + Send + Sync + Clone + 'static, +{ + factory: F, + listener: Option>, + workers: usize, +} + +impl WireframeServer +where + F: Fn() -> WireframeApp + Send + Sync + Clone + 'static, +{ + /// Constructs a new `WireframeServer` using the provided application factory closure. + /// + /// The server is initialised with a default worker count equal to the number of CPU cores. + /// + /// # Examples + /// + /// ``` + /// let server = WireframeServer::new(|| WireframeApp::default()); + /// ``` + pub fn new(factory: F) -> Self { + Self { + factory, + listener: None, + workers: num_cpus::get(), + } + } + + /// Set the number of worker tasks to spawn. + #[must_use] + /// Sets the number of worker tasks to spawn for the server. + /// + /// Ensures that at least one worker is configured, even if a lower value is provided. + /// + /// # Parameters + /// - `count`: Desired number of worker tasks. + /// + /// # Returns + /// A new `WireframeServer` instance with the updated worker count. + /// + /// # Examples + /// + /// ``` + /// let server = WireframeServer::new(factory).workers(4); + /// ``` + pub fn workers(mut self, count: usize) -> Self { + self.workers = count.max(1); + self + } + + /// Bind the server to the given address and create a listener. + /// + /// # Errors + /// + /// Binds the server to the specified socket address and prepares it for accepting TCP connections. + /// + /// Returns an error if binding to the address or configuring the listener fails. + /// + /// # Arguments + /// + /// * `addr` - The socket address to bind the server to. + /// + /// # Returns + /// + /// An updated server instance with the listener configured, or an `io::Error` if binding fails. + /// + /// # Examples + /// + /// ``` + /// use std::net::SocketAddr; + /// let server = WireframeServer::new(factory); + /// let addr: SocketAddr = "127.0.0.1:8080".parse().unwrap(); + /// let server = server.bind(addr).expect("Failed to bind address"); + /// ``` + pub fn bind(mut self, addr: SocketAddr) -> io::Result { + let std_listener = StdTcpListener::bind(addr)?; + std_listener.set_nonblocking(true)?; + let listener = TcpListener::from_std(std_listener)?; + self.listener = Some(Arc::new(listener)); + Ok(self) + } + + /// Run the server until a shutdown signal is received. + /// + /// Each worker accepts connections concurrently and would + /// process them using its `WireframeApp`. Connection handling + /// logic is not yet implemented. + /// + /// # Errors + /// + /// Returns an [`io::Error`] if accepting a connection fails. + /// + /// # Panics + /// + /// Runs the server, accepting TCP connections concurrently until shutdown. + /// + /// Spawns the configured number of worker tasks, each accepting incoming connections using a shared listener and a separate `WireframeApp` instance. The server listens for a Ctrl+C signal to initiate graceful shutdown, signalling all workers to stop accepting new connections. Waits for all worker tasks to complete before returning. + /// + /// # Panics + /// + /// Panics if called before `bind` has been invoked. + /// + /// # Returns + /// + /// Returns `Ok(())` when the server shuts down gracefully, or an `io::Error` if accepting connections fails during runtime. + /// + /// # Examples + /// + /// ``` + /// # use std::net::SocketAddr; + /// # use mycrate::{WireframeServer, WireframeApp}; + /// # async fn run_server() -> std::io::Result<()> { + /// let factory = || WireframeApp::new(); + /// let server = WireframeServer::new(factory) + /// .workers(4) + /// .bind("127.0.0.1:8080".parse::().unwrap())?; + /// server.run().await + /// # } + /// ``` + pub async fn run(self) -> io::Result<()> { + let listener = self.listener.expect("`bind` must be called before `run`"); + let (shutdown_tx, _) = broadcast::channel(16); + + // Spawn worker tasks using Tokio's runtime. + let mut handles = Vec::with_capacity(self.workers); + for _ in 0..self.workers { + let mut shutdown_rx = shutdown_tx.subscribe(); + let listener = Arc::clone(&listener); + let factory = self.factory.clone(); + handles.push(tokio::spawn(async move { + let app = (factory)(); + let mut delay = Duration::from_millis(10); + loop { + tokio::select! { + res = listener.accept() => match res { + Ok((_stream, _)) => { + // TODO: hand off stream to `app` + delay = Duration::from_millis(10); + } + Err(e) => { + eprintln!("accept error: {e}"); + sleep(delay).await; + delay = (delay * 2).min(Duration::from_secs(1)); + } + }, + _ = shutdown_rx.recv() => break, + } + } + drop(app); + })); + } + + // Wait for Ctrl+C or workers finishing. + let join_all = futures::future::join_all(handles); + tokio::pin!(join_all); + + tokio::select! { + _ = tokio::signal::ctrl_c() => { + let _ = shutdown_tx.send(()); + } + _ = &mut join_all => {} + } + + // Ensure all workers have exited before returning. + join_all.await; + Ok(()) + } +}