From 2b00ea787230598ddf72174e0fbdadffcd4a6c0c Mon Sep 17 00:00:00 2001 From: Leynos Date: Fri, 1 Aug 2025 10:19:02 +0100 Subject: [PATCH 1/3] Refactor worker task --- src/server.rs | 142 +++++++++++++++++++++++++++++++++++++++----------- 1 file changed, 111 insertions(+), 31 deletions(-) diff --git a/src/server.rs b/src/server.rs index f1592a19..9ce78059 100644 --- a/src/server.rs +++ b/src/server.rs @@ -372,12 +372,39 @@ where } } -/// Runs a worker task that accepts incoming TCP connections and processes them asynchronously. -/// -/// Each accepted connection is handled in a separate task, with optional callbacks for preamble -/// decode success or failure. The worker listens for shutdown signals to terminate gracefully. -/// Accept errors are retried with exponential backoff. -async fn worker_task( +/// Spawn a task to process a single TCP connection, logging and discarding any +/// panics from the task. +fn spawn_connection_task( + stream: tokio::net::TcpStream, + factory: F, + on_success: Option>, + on_failure: Option, + tracker: &TaskTracker, +) where + F: Fn() -> WireframeApp + Send + Sync + Clone + 'static, + T: Preamble, +{ + let peer_addr = stream.peer_addr().ok(); + tracker.spawn(async move { + use futures::FutureExt as _; + let fut = + std::panic::AssertUnwindSafe(process_stream(stream, factory, on_success, on_failure)) + .catch_unwind(); + + if let Err(panic) = fut.await { + let panic_msg = panic + .downcast_ref::<&str>() + .copied() + .or_else(|| panic.downcast_ref::().map(String::as_str)) + .unwrap_or(""); + tracing::error!(panic = %panic_msg, ?peer_addr, "connection task panicked"); + } + }); +} + +/// Accept incoming connections until `shutdown` is triggered, retrying on +/// errors with exponential backoff. +async fn accept_loop( listener: Arc, factory: F, on_success: Option>, @@ -386,7 +413,6 @@ async fn worker_task( tracker: TaskTracker, ) where F: Fn() -> WireframeApp + Send + Sync + Clone + 'static, - // `Preamble` ensures `T` supports borrowed decoding. T: Preamble, { let mut delay = Duration::from_millis(10); @@ -398,28 +424,13 @@ async fn worker_task( res = listener.accept() => match res { Ok((stream, _)) => { - let success = on_success.clone(); - let failure = on_failure.clone(); - let factory = factory.clone(); - let t = tracker.clone(); - // Capture peer address for better error context - let peer_addr = stream.peer_addr().ok(); - t.spawn(async move { - use futures::FutureExt as _; - let fut = std::panic::AssertUnwindSafe( - process_stream(stream, factory, success, failure), - ) - .catch_unwind(); - - if let Err(panic) = fut.await { - let panic_msg = panic - .downcast_ref::<&str>() - .copied() - .or_else(|| panic.downcast_ref::().map(String::as_str)) - .unwrap_or(""); - tracing::error!(panic = %panic_msg, ?peer_addr, "connection task panicked"); - } - }); + spawn_connection_task( + stream, + factory.clone(), + on_success.clone(), + on_failure.clone(), + &tracker, + ); delay = Duration::from_millis(10); } Err(e) => { @@ -432,6 +443,26 @@ async fn worker_task( } } +/// Runs a worker task that accepts incoming TCP connections and processes them +/// asynchronously. +/// +/// Each accepted connection is handled in a separate task, with optional callbacks for preamble +/// decode success or failure. The worker listens for shutdown signals to terminate gracefully. +async fn worker_task( + listener: Arc, + factory: F, + on_success: Option>, + on_failure: Option, + shutdown: CancellationToken, + tracker: TaskTracker, +) where + F: Fn() -> WireframeApp + Send + Sync + Clone + 'static, + // `Preamble` ensures `T` supports borrowed decoding. + T: Preamble, +{ + accept_loop(listener, factory, on_success, on_failure, shutdown, tracker).await; +} + /// Processes an incoming TCP stream by decoding a preamble and dispatching the connection to a /// `WireframeApp`. /// @@ -857,14 +888,14 @@ mod tests { #[rstest] #[tokio::test] - async fn test_worker_task_shutdown_signal( + async fn test_accept_loop_shutdown_signal( factory: impl Fn() -> WireframeApp + Send + Sync + Clone + 'static, ) { let token = CancellationToken::new(); let tracker = TaskTracker::new(); let listener = Arc::new(TcpListener::bind("127.0.0.1:0").await.unwrap()); - tracker.spawn(worker_task::<_, ()>( + tracker.spawn(accept_loop::<_, ()>( listener, factory, None, @@ -922,6 +953,55 @@ mod tests { assert!(cfg!(debug_assertions)); } + /// Panics in connection handlers are logged and do not tear down the worker. + #[rstest] + #[traced_test] + #[tokio::test] + async fn spawn_connection_task_logs_panic( + factory: impl Fn() -> WireframeApp + Send + Sync + Clone + 'static, + ) { + let app_factory = move || { + factory() + .on_connection_setup(|| async { panic!("boom") }) + .unwrap() + }; + let tracker = TaskTracker::new(); + let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); + let addr = listener.local_addr().unwrap(); + + let handle = tokio::spawn({ + let tracker = tracker.clone(); + async move { + let (stream, _) = listener.accept().await.unwrap(); + spawn_connection_task::<_, ()>(stream, app_factory, None, None, &tracker); + tracker.close(); + tracker.wait().await; + } + }); + + let client = TcpStream::connect(addr).await.unwrap(); + let peer_addr = client.local_addr().unwrap(); + client.writable().await.unwrap(); + client.try_write(&[0; 8]).unwrap(); + drop(client); + + handle.await.unwrap(); + + tokio::task::yield_now().await; + + logs_assert(|lines: &[&str]| { + lines + .iter() + .find(|line| { + line.contains("connection task panicked") + && line.contains("panic=boom") + && line.contains(&format!("peer_addr=Some({peer_addr})")) + }) + .map(|_| ()) + .ok_or_else(|| "panic log not found".to_string()) + }); + } + /// Ensure the server survives panicking connection tasks. /// /// The test spawns a server with a connection setup callback that From f4299e9c42e74aa2d75714d76c130f11cbcbc747 Mon Sep 17 00:00:00 2001 From: Leynos Date: Fri, 1 Aug 2025 14:45:04 +0100 Subject: [PATCH 2/3] Log peer address retrieval errors --- src/server.rs | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/src/server.rs b/src/server.rs index 9ce78059..0f00e135 100644 --- a/src/server.rs +++ b/src/server.rs @@ -384,7 +384,13 @@ fn spawn_connection_task( F: Fn() -> WireframeApp + Send + Sync + Clone + 'static, T: Preamble, { - let peer_addr = stream.peer_addr().ok(); + let peer_addr = match stream.peer_addr() { + Ok(addr) => Some(addr), + Err(e) => { + tracing::warn!(error = %e, "Failed to retrieve peer address"); + None + } + }; tracker.spawn(async move { use futures::FutureExt as _; let fut = From 26a33aa175051e5b0d7b9dabf0bc1ac35ae0598f Mon Sep 17 00:00:00 2001 From: Leynos Date: Fri, 1 Aug 2025 21:10:58 +0100 Subject: [PATCH 3/3] Update documentation to reflect the function's simplified role. Co-authored-by: coderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com> --- src/server.rs | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/src/server.rs b/src/server.rs index 0f00e135..f16e2124 100644 --- a/src/server.rs +++ b/src/server.rs @@ -449,11 +449,10 @@ async fn accept_loop( } } -/// Runs a worker task that accepts incoming TCP connections and processes them -/// asynchronously. +/// Worker task that delegates connection acceptance to `accept_loop`. /// -/// Each accepted connection is handled in a separate task, with optional callbacks for preamble -/// decode success or failure. The worker listens for shutdown signals to terminate gracefully. +/// This function serves as an entry point for worker tasks, passing all parameters +/// to `accept_loop` which handles the actual connection acceptance and processing. async fn worker_task( listener: Arc, factory: F,