From 4d14e6483c21f714e035900171242b43075f55db Mon Sep 17 00:00:00 2001 From: Leynos Date: Fri, 13 Jun 2025 15:49:06 +0100 Subject: [PATCH 1/2] Add physics engine DDlog schema --- physics/physics.dl | 38 ++++++++++++++++ src/extractor.rs | 46 ++++++++++--------- src/middleware.rs | 58 +++--------------------- src/server.rs | 109 +++++++-------------------------------------- 4 files changed, 83 insertions(+), 168 deletions(-) create mode 100644 physics/physics.dl diff --git a/physics/physics.dl b/physics/physics.dl new file mode 100644 index 00000000..70614a35 --- /dev/null +++ b/physics/physics.dl @@ -0,0 +1,38 @@ +// DDlog definitions for the experimental physics engine. +// +// These types model spatial positions, individual blocks and +// the slopes that connect them. They will be extended as the +// engine evolves. + +type Position = struct { + x: i32, + y: i32, + z: i32, +} + +enum BlockType { + Air, + Solid, + Water, +} + +type Block = struct { + pos: Position, + kind: BlockType, +} + +type Slope = struct { + from: Position, + to: Position, + gradient: i32, +} + +input relation Blocks(b: Block) +input relation Slopes(s: Slope) + +// Derived relation listing all occupied positions. +output relation Occupied(p: Position) + +Occupied(p) :- Blocks(Block{pos=p, ..}). +Occupied(p) :- Slopes(Slope{from=p, ..}). +Occupied(p) :- Slopes(Slope{to=p, ..}). diff --git a/src/extractor.rs b/src/extractor.rs index 6f31a4bd..3cd9767e 100644 --- a/src/extractor.rs +++ b/src/extractor.rs @@ -69,34 +69,13 @@ impl SharedState { /// assert_eq!(*state, 5); /// ``` #[must_use] - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn advance_consumes_bytes() { - let mut payload = Payload { data: b"hello" }; - payload.advance(2); - assert_eq!(payload.data, b"llo"); - payload.advance(10); - assert!(payload.data.is_empty()); - } - - #[test] - fn remaining_reports_length() { - let mut payload = Payload { data: b"abc" }; - assert_eq!(payload.remaining(), 3); - payload.advance(1); - assert_eq!(payload.remaining(), 2); - } -} /// Creates a new `SharedState` instance wrapping the provided `Arc`. /// /// # Examples /// /// ``` /// use std::sync::Arc; + /// use wireframe::extractor::SharedState; /// let state = Arc::new(42); /// let shared = SharedState::new(state.clone()); /// assert_eq!(*shared, 42); @@ -117,6 +96,7 @@ impl std::ops::Deref for SharedState { /// /// ``` /// use std::sync::Arc; + /// use wireframe::extractor::SharedState; /// let state = Arc::new(42); /// let shared = SharedState::new(state.clone()); /// assert_eq!(*shared, 42); @@ -125,3 +105,25 @@ impl std::ops::Deref for SharedState { &self.0 } } + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn advance_consumes_bytes() { + let mut payload = Payload { data: b"hello" }; + payload.advance(2); + assert_eq!(payload.data, b"llo"); + payload.advance(10); + assert!(payload.data.is_empty()); + } + + #[test] + fn remaining_reports_length() { + let mut payload = Payload { data: b"abc" }; + assert_eq!(payload.remaining(), 3); + payload.advance(1); + assert_eq!(payload.remaining(), 2); + } +} diff --git a/src/middleware.rs b/src/middleware.rs index ba5ab521..b2223ee1 100644 --- a/src/middleware.rs +++ b/src/middleware.rs @@ -1,91 +1,43 @@ use async_trait::async_trait; -/// Incoming request wrapper passed through middleware. #[derive(Debug)] pub struct ServiceRequest; -/// Response produced by a handler or middleware. #[derive(Debug, Default)] pub struct ServiceResponse; -/// Continuation used by middleware to call the next service in the chain. -pub struct Next<'a, S> -where - S: Service + ?Sized, -{ +pub struct Next<'a, S: Service + ?Sized> { service: &'a S, } -impl<'a, S> Next<'a, S> -where - S: Service + ?Sized, -{ - /// Creates a new `Next` instance wrapping a reference to the given service. - /// -/// -/// ```ignore -/// use wireframe::middleware::{ServiceRequest, ServiceResponse, Next, Service}; -/// ``` - /// Service produced by the middleware. - type Wrapped: Service; - async fn transform(&self, service: S) -> Self::Wrapped; - /// let service = MyService::default(); - /// let next = Next::new(&service); - type Wrapped: Service; - async fn transform(&self, service: S) -> Self::Wrapped; +impl<'a, S: Service + ?Sized> Next<'a, S> { + pub fn new(service: &'a S) -> Self { Self { service } } - /// Call the next service with the given request. + /// Call the next service with the provided request. /// /// # Errors /// - /// Asynchronously invokes the next service in the middleware chain with the given request. - /// - /// Returns the response from the wrapped service, or propagates any error produced. - /// - /// # Examples - /// - /// ``` - /// # use your_crate::{ServiceRequest, ServiceResponse, Next, Service}; - /// # struct DummyService; - /// # #[async_trait::async_trait] - /// # impl Service for DummyService { - /// # type Error = std::convert::Infallible; - /// # async fn call(&self, _req: ServiceRequest) -> Result { - /// # Ok(ServiceResponse::default()) - /// # } - /// # } - /// # let service = DummyService; - /// let next = Next::new(&service); - /// let req = ServiceRequest {}; - /// let res = tokio_test::block_on(next.call(req)); - /// assert!(res.is_ok()); - /// ``` + /// Propagates any error returned by the wrapped service. pub async fn call(&self, req: ServiceRequest) -> Result { self.service.call(req).await } } -/// Trait representing an asynchronous service. #[async_trait] pub trait Service: Send + Sync { - /// Error type returned by the service. type Error: std::error::Error + Send + Sync + 'static; - /// Handle the incoming request and produce a response. async fn call(&self, req: ServiceRequest) -> Result; } -/// Factory for wrapping services with middleware. #[async_trait] pub trait Transform: Send + Sync where S: Service, { - /// Wrapped service produced by the middleware. type Output: Service; - /// Create a new middleware service wrapping `service`. async fn transform(&self, service: S) -> Self::Output; } diff --git a/src/server.rs b/src/server.rs index 66219a5a..fec63963 100644 --- a/src/server.rs +++ b/src/server.rs @@ -10,11 +10,9 @@ use crate::app::WireframeApp; /// Tokio-based server for `WireframeApp` instances. /// -/// `WireframeServer` spawns a worker task per thread. Each worker -/// receives its own `WireframeApp` from the provided factory -/// closure. The server listens for a shutdown signal using -/// `tokio::signal::ctrl_c` and notifies all workers to stop -/// accepting new connections. +/// Each worker receives its own application instance from the provided +/// factory. A Ctrl+C signal triggers graceful shutdown across all +/// workers. pub struct WireframeServer where F: Fn() -> WireframeApp + Send + Sync + Clone + 'static, @@ -28,71 +26,28 @@ impl WireframeServer where F: Fn() -> WireframeApp + Send + Sync + Clone + 'static, { - /// Constructs a new `WireframeServer` using the provided application factory closure. - /// - /// The server is initialised with a default worker count equal to the number of CPU cores. - /// - /// ```no_run - /// use wireframe::{app::WireframeApp, server::WireframeServer}; - /// - /// let factory = || WireframeApp::new().unwrap(); - /// let server = WireframeServer::new(factory); - /// ``` + /// Create a new server using the given application factory. + #[must_use] + pub fn new(factory: F) -> Self { + Self { + factory, + listener: None, workers: num_cpus::get().max(1), + } + } /// Set the number of worker tasks to spawn for the server. - /// - /// #[tokio::main] - /// async fn main() -> std::io::Result<()> { - /// let factory = || WireframeApp::new().unwrap(); - /// WireframeServer::new(factory) - /// .workers(4) - /// .bind("127.0.0.1:0".parse().unwrap())? - /// .run() - /// .await - /// } - /// A new `WireframeServer` instance with the updated worker count. - /// - /// # Examples - /// - /// ```ignore - /// let server = WireframeServer::new(factory).workers(4); - /// Sets the number of worker tasks for the server, ensuring at least one worker. - /// - /// # Examples - /// - /// ```ignore - /// let server = WireframeServer::new(factory).workers(4); - /// ``` + #[must_use] pub fn workers(mut self, count: usize) -> Self { self.workers = count.max(1); self } - /// Bind the server to the given address and create a listener. + /// Bind the server to the provided socket address. /// /// # Errors /// - /// Binds the server to the specified socket address and prepares it for accepting TCP connections. - /// - /// Returns an error if binding to the address or configuring the listener fails. - /// - /// # Arguments - /// - /// * `addr` - The socket address to bind the server to. - /// - /// # Returns - /// - /// An updated server instance with the listener configured, or an `io::Error` if binding fails. - /// - /// # Examples - /// - /// ```ignore - /// use std::net::SocketAddr; - /// let server = WireframeServer::new(factory); - /// let addr: SocketAddr = "127.0.0.1:8080".parse().unwrap(); - /// let server = server.bind(addr).expect("Failed to bind address"); - /// ``` + /// Returns any I/O error produced while creating the TCP listener. pub fn bind(mut self, addr: SocketAddr) -> io::Result { let std_listener = StdTcpListener::bind(addr)?; std_listener.set_nonblocking(true)?; @@ -103,46 +58,17 @@ where /// Run the server until a shutdown signal is received. /// - /// Each worker accepts connections concurrently and would - /// process them using its `WireframeApp`. Connection handling - /// logic is not yet implemented. - /// /// # Errors /// - /// Returns an [`io::Error`] if accepting a connection fails. + /// Returns any I/O error encountered while accepting connections. /// /// # Panics /// - /// Runs the server, accepting TCP connections concurrently until shutdown. - /// - /// Spawns the configured number of worker tasks, each accepting incoming connections using a shared listener and a separate `WireframeApp` instance. The server listens for a Ctrl+C signal to initiate graceful shutdown, signalling all workers to stop accepting new connections. Waits for all worker tasks to complete before returning. - /// - /// # Panics - /// - /// Panics if called before `bind` has been invoked. - /// - /// # Returns - /// - /// Returns `Ok(())` when the server shuts down gracefully, or an `io::Error` if accepting connections fails during runtime. - /// - /// # Examples - /// - /// ```ignore - /// # use std::net::SocketAddr; - /// # use mycrate::{WireframeServer, WireframeApp}; - /// # async fn run_server() -> std::io::Result<()> { - /// let factory = || WireframeApp::new(); - /// let server = WireframeServer::new(factory) - /// .workers(4) - /// .bind("127.0.0.1:8080".parse::().unwrap())?; - /// server.run().await - /// # } - /// ``` + /// Panics if called before [`bind`] has configured the listener. pub async fn run(self) -> io::Result<()> { let listener = self.listener.expect("`bind` must be called before `run`"); let (shutdown_tx, _) = broadcast::channel(16); - // Spawn worker tasks using Tokio's runtime. let mut handles = Vec::with_capacity(self.workers); for _ in 0..self.workers { let mut shutdown_rx = shutdown_tx.subscribe(); @@ -155,7 +81,6 @@ where tokio::select! { res = listener.accept() => match res { Ok((_stream, _)) => { - // TODO: hand off stream to `app` delay = Duration::from_millis(10); } Err(e) => { @@ -171,7 +96,6 @@ where })); } - // Wait for Ctrl+C or workers finishing. let join_all = futures::future::join_all(handles); tokio::pin!(join_all); @@ -182,7 +106,6 @@ where _ = &mut join_all => {} } - // Ensure all workers have exited before returning. join_all.await; Ok(()) } From 67de183f2a752118894b2fbdb66d5f95ff5213f7 Mon Sep 17 00:00:00 2001 From: Leynos Date: Fri, 13 Jun 2025 19:05:14 +0100 Subject: [PATCH 2/2] Refine DDlog schema and docs --- physics/physics.dl | 8 +++++--- src/extractor.rs | 19 +++---------------- src/middleware.rs | 2 +- 3 files changed, 9 insertions(+), 20 deletions(-) diff --git a/physics/physics.dl b/physics/physics.dl index 70614a35..bb607e35 100644 --- a/physics/physics.dl +++ b/physics/physics.dl @@ -33,6 +33,8 @@ input relation Slopes(s: Slope) // Derived relation listing all occupied positions. output relation Occupied(p: Position) -Occupied(p) :- Blocks(Block{pos=p, ..}). -Occupied(p) :- Slopes(Slope{from=p, ..}). -Occupied(p) :- Slopes(Slope{to=p, ..}). +// Use a single rule to avoid redundant facts if aggregation is added later. +Occupied(p) :- + Blocks(Block{pos=p, ..}); + Slopes(Slope{from=p, ..}); + Slopes(Slope{to=p, ..}). diff --git a/src/extractor.rs b/src/extractor.rs index 3cd9767e..b6aacee6 100644 --- a/src/extractor.rs +++ b/src/extractor.rs @@ -56,30 +56,17 @@ pub trait FromMessageRequest: Sized { pub struct SharedState(Arc); impl SharedState { - /// Construct a new [`SharedState`]. - /// - /// # Examples - /// - /// ```ignore - /// use std::sync::Arc; - /// use wireframe::extractor::SharedState; - /// - /// let data = Arc::new(5u32); - /// let state = SharedState::new(Arc::clone(&data)); - /// assert_eq!(*state, 5); - /// ``` - #[must_use] - /// Creates a new `SharedState` instance wrapping the provided `Arc`. - /// - /// # Examples + /// Construct a new [`SharedState`] wrapping the provided `Arc`. /// /// ``` /// use std::sync::Arc; /// use wireframe::extractor::SharedState; + /// /// let state = Arc::new(42); /// let shared = SharedState::new(state.clone()); /// assert_eq!(*shared, 42); /// ``` + #[must_use] pub fn new(inner: Arc) -> Self { Self(inner) } diff --git a/src/middleware.rs b/src/middleware.rs index b2223ee1..cad575c0 100644 --- a/src/middleware.rs +++ b/src/middleware.rs @@ -20,7 +20,7 @@ impl<'a, S: Service + ?Sized> Next<'a, S> { /// # Errors /// /// Propagates any error returned by the wrapped service. - pub async fn call(&self, req: ServiceRequest) -> Result { + pub async fn call(&mut self, req: ServiceRequest) -> Result { self.service.call(req).await } }