From 98a6f7406fd72f8fbf784d967ada013355de942b Mon Sep 17 00:00:00 2001 From: Leynos Date: Thu, 31 Jul 2025 22:31:12 +0100 Subject: [PATCH 01/10] Replace unwraps with expect --- src/frame/tests.rs | 7 +- src/push.rs | 3 +- src/server.rs | 64 +++++++++++------ tests/app_data.rs | 5 +- tests/async_stream.rs | 2 +- tests/connection_actor.rs | 84 ++++++++++++++++------- tests/extractor.rs | 10 +-- tests/lifecycle.rs | 26 ++++--- tests/metadata.rs | 4 +- tests/middleware.rs | 2 +- tests/middleware_order.rs | 29 ++++---- tests/preamble.rs | 38 ++++++----- tests/push.rs | 113 ++++++++++++++++++++++--------- tests/push_policies.rs | 45 +++++++----- tests/response.rs | 28 +++++--- tests/routes.rs | 42 ++++++------ tests/server.rs | 12 ++-- tests/session_registry.rs | 4 +- tests/wireframe_protocol.rs | 15 ++-- wireframe_testing/src/helpers.rs | 2 +- 20 files changed, 349 insertions(+), 186 deletions(-) diff --git a/src/frame/tests.rs b/src/frame/tests.rs index a42ade22..39aea809 100644 --- a/src/frame/tests.rs +++ b/src/frame/tests.rs @@ -24,7 +24,10 @@ fn bytes_to_u64_ok( #[case] endianness: Endianness, #[case] expected: u64, ) { - assert_eq!(bytes_to_u64(&bytes, size, endianness).unwrap(), expected); + assert_eq!( + bytes_to_u64(&bytes, size, endianness).expect("failed to convert"), + expected + ); } #[rstest] @@ -42,7 +45,7 @@ fn u64_to_bytes_ok( #[case] expected: Vec, ) { let mut buf = [0u8; 8]; - let written = u64_to_bytes(value, size, endianness, &mut buf).unwrap(); + let written = u64_to_bytes(value, size, endianness, &mut buf).expect("failed to encode u64"); assert_eq!(written, size); assert_eq!(&buf[..written], expected.as_slice()); } diff --git a/src/push.rs b/src/push.rs index 01103745..c25abff0 100644 --- a/src/push.rs +++ b/src/push.rs @@ -309,7 +309,8 @@ impl PushQueues { high_capacity: usize, low_capacity: usize, ) -> (Self, PushHandle) { - Self::bounded_with_rate_dlq(high_capacity, low_capacity, None, None).unwrap() + Self::bounded_with_rate_dlq(high_capacity, low_capacity, None, None) + .expect("bounded_no_rate_limit should not fail") } /// Create queues with a custom rate limit in pushes per second. diff --git a/src/server.rs b/src/server.rs index e637106c..572711f1 100644 --- a/src/server.rs +++ b/src/server.rs @@ -572,8 +572,11 @@ mod tests { #[fixture] fn free_port() -> SocketAddr { let addr = SocketAddr::new(Ipv4Addr::LOCALHOST.into(), 0); - let listener = std::net::TcpListener::bind(addr).unwrap(); - listener.local_addr().unwrap() + let listener = + std::net::TcpListener::bind(addr).expect("failed to bind free port listener"); + listener + .local_addr() + .expect("failed to read free port listener address") } fn bind_server(factory: F, addr: SocketAddr) -> WireframeServer @@ -649,7 +652,9 @@ mod tests { free_port: SocketAddr, ) { let server = bind_server(factory, free_port); - let bound_addr = server.local_addr().unwrap(); + let bound_addr = server + .local_addr() + .expect("bound server should return local address"); assert_eq!(bound_addr.ip(), free_port.ip()); } @@ -681,7 +686,10 @@ mod tests { let server = bind_server(factory, free_port); let local_addr = server.local_addr(); assert!(local_addr.is_some()); - assert_eq!(local_addr.unwrap().ip(), free_port.ip()); + assert_eq!( + local_addr.expect("local address missing").ip(), + free_port.ip() + ); } #[rstest] @@ -800,7 +808,7 @@ mod tests { .await; assert!(result.is_ok()); - assert!(result.unwrap().is_ok()); + assert!(result.expect("server run timed out").is_ok()); } #[rstest] @@ -827,7 +835,7 @@ mod tests { let elapsed = start.elapsed(); assert!(result.is_ok()); - assert!(result.unwrap().is_ok()); + assert!(result.expect("server run timed out").is_ok()); assert!(elapsed < Duration::from_millis(500)); } @@ -862,7 +870,7 @@ mod tests { .await; assert!(result.is_ok()); - assert!(result.unwrap().is_ok()); + assert!(result.expect("server run timed out").is_ok()); } #[rstest] @@ -903,7 +911,11 @@ mod tests { ) { let token = CancellationToken::new(); let tracker = TaskTracker::new(); - let listener = Arc::new(TcpListener::bind("127.0.0.1:0").await.unwrap()); + let listener = Arc::new( + TcpListener::bind("127.0.0.1:0") + .await + .expect("failed to bind test listener"), + ); tracker.spawn(accept_loop::<_, ()>( listener, @@ -944,15 +956,18 @@ mod tests { let addr1 = free_port; let addr2 = { let addr = SocketAddr::new(Ipv4Addr::LOCALHOST.into(), 0); - let listener = std::net::TcpListener::bind(addr).unwrap(); - listener.local_addr().unwrap() + let listener = + std::net::TcpListener::bind(addr).expect("failed to bind second listener"); + listener + .local_addr() + .expect("failed to get second listener address") }; let server = server.bind(addr1).expect("Failed to bind first address"); - let first_local_addr = server.local_addr().unwrap(); + let first_local_addr = server.local_addr().expect("first bound address missing"); let server = server.bind(addr2).expect("Failed to bind second address"); - let second_local_addr = server.local_addr().unwrap(); + let second_local_addr = server.local_addr().expect("second bound address missing"); assert_ne!(first_local_addr.port(), second_local_addr.port()); assert_eq!(second_local_addr.ip(), addr2.ip()); @@ -1032,13 +1047,19 @@ mod tests { let app_factory = move || { factory() .on_connection_setup(|| async { panic!("boom") }) - .unwrap() + .expect("failed to install panic setup callback") }; let server = WireframeServer::new(app_factory) .workers(1) - .bind("127.0.0.1:0".parse().unwrap()) + .bind( + "127.0.0.1:0" + .parse() + .expect("hard-coded socket address must be valid"), + ) .expect("bind"); - let addr = server.local_addr().unwrap(); + let addr = server + .local_addr() + .expect("failed to retrieve server address"); let (tx, rx) = oneshot::channel(); let handle = tokio::spawn(async move { @@ -1047,22 +1068,27 @@ mod tests { let _ = rx.await; }) .await - .unwrap(); + .expect("server run failed"); }); let first = TcpStream::connect(addr) .await .expect("first connection should succeed"); let peer_addr = first.local_addr().expect("first connection peer address"); - first.writable().await.unwrap(); - first.try_write(&[0; 8]).unwrap(); + first + .writable() + .await + .expect("connection not writable after connect"); + first + .try_write(&[0; 8]) + .expect("failed to write dummy bytes"); drop(first); TcpStream::connect(addr) .await .expect("second connection should succeed after panic"); let _ = tx.send(()); - handle.await.unwrap(); + handle.await.expect("server join error"); tokio::task::yield_now().await; diff --git a/tests/app_data.rs b/tests/app_data.rs index f3794f5e..afb60aab 100644 --- a/tests/app_data.rs +++ b/tests/app_data.rs @@ -31,7 +31,8 @@ fn shared_state_extractor_returns_data( mut empty_payload: Payload<'static>, ) { request.insert_state(5u32); - let extracted = SharedState::::from_message_request(&request, &mut empty_payload).unwrap(); + let extracted = SharedState::::from_message_request(&request, &mut empty_payload) + .expect("failed to extract shared state"); assert_eq!(*extracted, 5); } @@ -42,6 +43,6 @@ fn missing_shared_state_returns_error( ) { let err = SharedState::::from_message_request(&request, &mut empty_payload) .err() - .unwrap(); + .expect("missing state error expected"); assert!(matches!(err, ExtractError::MissingState(_))); } diff --git a/tests/async_stream.rs b/tests/async_stream.rs index 66b39427..86f1482f 100644 --- a/tests/async_stream.rs +++ b/tests/async_stream.rs @@ -29,6 +29,6 @@ async fn async_stream_frames_processed_in_order() { let mut actor = ConnectionActor::new(queues, handle, Some(stream), shutdown); let mut out = Vec::new(); - actor.run(&mut out).await.unwrap(); + actor.run(&mut out).await.expect("actor run failed"); assert_eq!(out, vec![0, 1, 2]); } diff --git a/tests/connection_actor.rs b/tests/connection_actor.rs index c7a8bd64..867fe451 100644 --- a/tests/connection_actor.rs +++ b/tests/connection_actor.rs @@ -45,14 +45,20 @@ async fn strict_priority_order( shutdown_token: CancellationToken, ) { let (queues, handle) = queues; - handle.push_low_priority(2).await.unwrap(); - handle.push_high_priority(1).await.unwrap(); + handle + .push_low_priority(2) + .await + .expect("push low priority failed"); + handle + .push_high_priority(1) + .await + .expect("push high priority failed"); let stream = stream::iter(vec![Ok(3u8)]); let mut actor: ConnectionActor<_, ()> = ConnectionActor::new(queues, handle, Some(Box::pin(stream)), shutdown_token); let mut out = Vec::new(); - actor.run(&mut out).await.unwrap(); + actor.run(&mut out).await.expect("actor run failed"); assert_eq!(out, vec![1, 2, 3]); } @@ -70,15 +76,21 @@ async fn fairness_yields_low_after_burst( }; for n in 1..=5 { - handle.push_high_priority(n).await.unwrap(); + handle + .push_high_priority(n) + .await + .expect("push high priority failed"); } - handle.push_low_priority(99).await.unwrap(); + handle + .push_low_priority(99) + .await + .expect("push low priority failed"); let mut actor: ConnectionActor<_, ()> = ConnectionActor::new(queues, handle, None, shutdown_token); actor.set_fairness(fairness); let mut out = Vec::new(); - actor.run(&mut out).await.unwrap(); + actor.run(&mut out).await.expect("actor run failed"); assert_eq!(out, vec![1, 2, 99, 3, 4, 5]); } @@ -198,19 +210,34 @@ async fn fairness_yields_low_with_time_slice( let _ = tx.send(out); }); - handle.push_high_priority(1).await.unwrap(); + handle + .push_high_priority(1) + .await + .expect("push high priority failed"); tokio::time::advance(Duration::from_millis(5)).await; - handle.push_high_priority(2).await.unwrap(); + handle + .push_high_priority(2) + .await + .expect("push high priority failed"); tokio::time::advance(Duration::from_millis(15)).await; - handle.push_low_priority(42).await.unwrap(); + handle + .push_low_priority(42) + .await + .expect("push low priority failed"); for n in 3..=5 { - handle.push_high_priority(n).await.unwrap(); + handle + .push_high_priority(n) + .await + .expect("push high priority failed"); } drop(handle); - let out = rx.await.unwrap(); + let out = rx.await.expect("actor output missing"); assert!(out.contains(&42), "Low-priority item was not yielded"); - let pos = out.iter().position(|x| *x == 42).unwrap(); + let pos = out + .iter() + .position(|x| *x == 42) + .expect("value 42 should be present"); assert!( pos > 0 && pos < out.len() - 1, "Low-priority item should be yielded in the middle" @@ -230,7 +257,7 @@ async fn shutdown_signal_precedence( ConnectionActor::new(queues, handle, None, shutdown_token); // drop the handle after actor creation to mimic early disconnection let mut out = Vec::new(); - actor.run(&mut out).await.unwrap(); + actor.run(&mut out).await.expect("actor run failed"); assert!(out.is_empty()); } @@ -242,14 +269,17 @@ async fn complete_draining_of_sources( shutdown_token: CancellationToken, ) { let (queues, handle) = queues; - handle.push_high_priority(1).await.unwrap(); + handle + .push_high_priority(1) + .await + .expect("push high priority failed"); let stream = stream::iter(vec![Ok(2u8), Ok(3u8)]); let mut actor: ConnectionActor<_, ()> = ConnectionActor::new(queues, handle, Some(Box::pin(stream)), shutdown_token); // drop handle after actor setup let mut out = Vec::new(); - actor.run(&mut out).await.unwrap(); + actor.run(&mut out).await.expect("actor run failed"); assert_eq!(out, vec![1, 2, 3]); } @@ -289,7 +319,7 @@ async fn error_propagation_from_stream( hooks, ); let mut out = Vec::new(); - actor.run(&mut out).await.unwrap(); + actor.run(&mut out).await.expect("actor run failed"); assert_eq!(called.load(Ordering::SeqCst), 1); assert_eq!(out, vec![1, 2]); } @@ -307,7 +337,7 @@ async fn protocol_error_logs_warning( let mut actor: ConnectionActor<_, TestError> = ConnectionActor::new(queues, handle, Some(Box::pin(stream)), shutdown_token); let mut out = Vec::new(); - actor.run(&mut out).await.unwrap(); + actor.run(&mut out).await.expect("actor run failed"); assert!(out.is_empty()); let mut found = false; while let Some(record) = logger.pop() { @@ -364,7 +394,7 @@ async fn interleaved_shutdown_during_stream( let mut actor: ConnectionActor<_, ()> = ConnectionActor::new(queues, handle, Some(Box::pin(stream)), shutdown_token); let mut out = Vec::new(); - actor.run(&mut out).await.unwrap(); + actor.run(&mut out).await.expect("actor run failed"); assert!(!out.is_empty() && out.len() < 5); } @@ -373,7 +403,10 @@ async fn interleaved_shutdown_during_stream( #[serial] async fn push_queue_exhaustion_backpressure() { let (mut queues, handle) = PushQueues::bounded(1, 1); - handle.push_high_priority(1).await.unwrap(); + handle + .push_high_priority(1) + .await + .expect("push high priority failed"); let blocked = timeout(Duration::from_millis(50), handle.push_high_priority(2)).await; assert!(blocked.is_err()); @@ -399,7 +432,10 @@ async fn before_send_hook_modifies_frames( shutdown_token: CancellationToken, ) { let (queues, handle) = queues; - handle.push_high_priority(1).await.unwrap(); + handle + .push_high_priority(1) + .await + .expect("push high priority failed"); let stream = stream::iter(vec![Ok(2u8)]); let hooks = ProtocolHooks { @@ -415,7 +451,7 @@ async fn before_send_hook_modifies_frames( hooks, ); let mut out = Vec::new(); - actor.run(&mut out).await.unwrap(); + actor.run(&mut out).await.expect("actor run failed"); assert_eq!(out, vec![2, 3]); } @@ -446,7 +482,7 @@ async fn on_command_end_hook_runs( hooks, ); let mut out = Vec::new(); - actor.run(&mut out).await.unwrap(); + actor.run(&mut out).await.expect("actor run failed"); assert_eq!(counter.load(Ordering::SeqCst), 1); } @@ -495,7 +531,7 @@ async fn connection_count_decrements_on_abort( assert_eq!(during, before + 1); let mut out = Vec::new(); - actor.run(&mut out).await.unwrap(); + actor.run(&mut out).await.expect("actor run failed"); let after = wireframe::connection::active_connection_count(); assert_eq!(during - after, 1); } @@ -516,7 +552,7 @@ async fn connection_count_decrements_on_close( assert_eq!(during, before + 1); let mut out = Vec::new(); - actor.run(&mut out).await.unwrap(); + actor.run(&mut out).await.expect("actor run failed"); let after = wireframe::connection::active_connection_count(); assert_eq!(during - after, 1); } diff --git a/tests/extractor.rs b/tests/extractor.rs index 57fc3349..c34c32a5 100644 --- a/tests/extractor.rs +++ b/tests/extractor.rs @@ -35,10 +35,11 @@ struct TestMsg(u8); #[rstest] fn message_extractor_parses_and_advances(request: MessageRequest) { let msg = TestMsg(42); - let bytes = msg.to_bytes().unwrap(); + let bytes = msg.to_bytes().expect("failed to serialise message"); let mut payload = Payload::new(bytes.as_slice()); - let extracted = Message::::from_message_request(&request, &mut payload).unwrap(); + let extracted = Message::::from_message_request(&request, &mut payload) + .expect("failed to extract TestMsg from payload"); assert_eq!(*extracted, msg); assert_eq!(payload.remaining(), 0); } @@ -51,7 +52,8 @@ fn connection_info_reports_peer(mut request: MessageRequest, mut empty_payload: .parse() .expect("hard-coded socket address must be valid"); request.peer_addr = Some(addr); - let info = ConnectionInfo::from_message_request(&request, &mut empty_payload).unwrap(); + let info = ConnectionInfo::from_message_request(&request, &mut empty_payload) + .expect("failed to build ConnectionInfo"); assert_eq!(info.peer_addr(), Some(addr)); } @@ -66,7 +68,7 @@ fn shared_state_extractor(mut request: MessageRequest, mut empty_payload: Payloa let state = wireframe::extractor::SharedState::::from_message_request(&request, &mut empty_payload) - .unwrap(); + .expect("failed to extract shared state"); assert_eq!(*state, 42); } diff --git a/tests/lifecycle.rs b/tests/lifecycle.rs index a4088320..a81068b0 100644 --- a/tests/lifecycle.rs +++ b/tests/lifecycle.rs @@ -50,11 +50,11 @@ where let teardown_cb = call_counting_callback(teardown, ()); WireframeApp::<_, _, E>::new_with_envelope() - .unwrap() + .expect("failed to create app") .on_connection_setup(move || setup_cb(())) - .unwrap() + .expect("setup callback") .on_connection_teardown(teardown_cb) - .unwrap() + .expect("teardown callback") } #[tokio::test] @@ -75,9 +75,9 @@ async fn setup_without_teardown_runs() { let cb = call_counting_callback(&setup_count, ()); let app = WireframeApp::new() - .unwrap() + .expect("failed to create app") .on_connection_setup(move || cb(())) - .unwrap(); + .expect("setup callback"); run_with_duplex_server(app).await; @@ -90,9 +90,9 @@ async fn teardown_without_setup_does_not_run() { let cb = call_counting_callback(&teardown_count, ()); let app = WireframeApp::new() - .unwrap() + .expect("failed to create app") .on_connection_teardown(cb) - .unwrap(); + .expect("teardown callback"); run_with_duplex_server(app).await; @@ -121,19 +121,23 @@ async fn helpers_propagate_connection_state() { let app = wireframe_app_with_lifecycle_callbacks::(&setup, &teardown, 7) .frame_processor(processor()) .route(1, Arc::new(|_: &StateEnvelope| Box::pin(async {}))) - .unwrap(); + .expect("route registration failed"); let env = StateEnvelope { id: 1, msg: vec![1], }; - let bytes = BincodeSerializer.serialize(&env).unwrap(); + let bytes = BincodeSerializer + .serialize(&env) + .expect("failed to serialise envelope"); let mut frame = BytesMut::new(); LengthPrefixedProcessor::default() .encode(&bytes, &mut frame) - .unwrap(); + .expect("encode should succeed"); - let out = run_app_with_frame(app, frame.to_vec()).await.unwrap(); + let out = run_app_with_frame(app, frame.to_vec()) + .await + .expect("app run failed"); assert!(!out.is_empty()); assert_eq!(setup.load(Ordering::SeqCst), 1); assert_eq!(teardown.load(Ordering::SeqCst), 1); diff --git a/tests/metadata.rs b/tests/metadata.rs index 200e60c4..86c9a3bf 100644 --- a/tests/metadata.rs +++ b/tests/metadata.rs @@ -19,11 +19,11 @@ where S: TestSerializer, { WireframeApp::new() - .unwrap() + .expect("failed to create app") .frame_processor(LengthPrefixedProcessor::default()) .serializer(serializer) .route(1, Arc::new(|_| Box::pin(async {}))) - .unwrap() + .expect("route registration failed") } struct CountingSerializer(Arc); diff --git a/tests/middleware.rs b/tests/middleware.rs index 313ec16e..a4f56f54 100644 --- a/tests/middleware.rs +++ b/tests/middleware.rs @@ -55,6 +55,6 @@ async fn middleware_modifies_request_and_response() { let wrapped = mw.transform(service).await; let request = ServiceRequest::new(vec![1, 2, 3]); - let response = wrapped.call(request).await.unwrap(); + let response = wrapped.call(request).await.expect("middleware call failed"); assert_eq!(response.frame(), &[1, 2, 3, b'!', b'?']); } diff --git a/tests/middleware_order.rs b/tests/middleware_order.rs index 196c39e4..6f8bdb6a 100644 --- a/tests/middleware_order.rs +++ b/tests/middleware_order.rs @@ -54,35 +54,40 @@ impl Transform> for TagMiddleware { async fn middleware_applied_in_reverse_order() { let handler: Handler = std::sync::Arc::new(|_env: &Envelope| Box::pin(async {})); let app = WireframeApp::new() - .unwrap() + .expect("failed to create app") .route(1, handler) - .unwrap() + .expect("route registration failed") .wrap(TagMiddleware(b'A')) - .unwrap() + .expect("wrap failed") .wrap(TagMiddleware(b'B')) - .unwrap(); + .expect("wrap failed"); let (mut client, server) = duplex(256); let env = Envelope::new(1, vec![b'X']); let serializer = BincodeSerializer; - let bytes = serializer.serialize(&env).unwrap(); + let bytes = serializer.serialize(&env).expect("serialization failed"); // Use the default 4-byte big-endian length prefix for framing let processor = LengthPrefixedProcessor::default(); let mut buf = BytesMut::new(); - processor.encode(&bytes, &mut buf).unwrap(); - client.write_all(&buf).await.unwrap(); - client.shutdown().await.unwrap(); + processor.encode(&bytes, &mut buf).expect("encoding failed"); + client.write_all(&buf).await.expect("write failed"); + client.shutdown().await.expect("shutdown failed"); let handle = tokio::spawn(async move { app.handle_connection(server).await }); let mut out = Vec::new(); - client.read_to_end(&mut out).await.unwrap(); - handle.await.unwrap(); + client.read_to_end(&mut out).await.expect("read failed"); + handle.await.expect("join failed"); let mut buf = BytesMut::from(&out[..]); - let frame = processor.decode(&mut buf).unwrap().unwrap(); - let (resp, _) = serializer.deserialize::(&frame).unwrap(); + let frame = processor + .decode(&mut buf) + .expect("decode failed") + .expect("frame missing"); + let (resp, _) = serializer + .deserialize::(&frame) + .expect("deserialize failed"); let (_, bytes) = resp.into_parts(); assert_eq!(bytes, vec![b'X', b'A', b'B', b'B', b'A']); } diff --git a/tests/preamble.rs b/tests/preamble.rs index ac5bf586..79d5177f 100644 --- a/tests/preamble.rs +++ b/tests/preamble.rs @@ -68,7 +68,9 @@ where Fut: std::future::Future, B: FnOnce(std::net::SocketAddr) -> Fut, { - let server = server.bind("127.0.0.1:0".parse().unwrap()).expect("bind"); + let server = server + .bind("127.0.0.1:0".parse().expect("hard-coded socket addr")) + .expect("bind"); let addr = server.local_addr().expect("addr"); let (shutdown_tx, shutdown_rx) = oneshot::channel::<()>(); let handle = tokio::spawn(async move { @@ -77,25 +79,25 @@ where let _ = shutdown_rx.await; }) .await - .unwrap(); + .expect("server run failed"); }); block(addr).await; let _ = shutdown_tx.send(()); - handle.await.unwrap(); + handle.await.expect("server join failed"); } #[tokio::test] async fn parse_valid_preamble() { let (mut client, mut server) = duplex(64); let bytes = b"TRTPHOTL\x00\x01\x00\x02"; - client.write_all(bytes).await.unwrap(); - client.shutdown().await.unwrap(); + client.write_all(bytes).await.expect("write failed"); + client.shutdown().await.expect("shutdown failed"); let (p, _) = read_preamble::<_, HotlinePreamble>(&mut server) .await .expect("valid preamble"); eprintln!("decoded: {p:?}"); - p.validate().unwrap(); + p.validate().expect("preamble validation failed"); assert_eq!(p.magic, HotlinePreamble::MAGIC); assert_eq!(p.min_version, 1); assert_eq!(p.client_version, 2); @@ -105,8 +107,8 @@ async fn parse_valid_preamble() { async fn invalid_magic_is_error() { let (mut client, mut server) = duplex(64); let bytes = b"WRONGMAG\x00\x01\x00\x02"; - client.write_all(bytes).await.unwrap(); - client.shutdown().await.unwrap(); + client.write_all(bytes).await.expect("write failed"); + client.shutdown().await.expect("shutdown failed"); let (preamble, _) = read_preamble::<_, HotlinePreamble>(&mut server) .await .expect("decoded"); @@ -140,7 +142,7 @@ async fn server_triggers_expected_callback( let success_tx = success_tx.clone(); let clone = p.clone(); Box::pin(async move { - if let Some(tx) = success_tx.lock().unwrap().take() { + if let Some(tx) = success_tx.lock().expect("lock poisoned").take() { let _ = tx.send(clone); } Ok(()) @@ -150,7 +152,7 @@ async fn server_triggers_expected_callback( { let failure_tx = failure_tx.clone(); move |_| { - if let Some(tx) = failure_tx.lock().unwrap().take() { + if let Some(tx) = failure_tx.lock().expect("lock poisoned").take() { let _ = tx.send(()); } } @@ -158,9 +160,9 @@ async fn server_triggers_expected_callback( ); with_running_server(server, |addr| async move { - let mut stream = TcpStream::connect(addr).await.unwrap(); - stream.write_all(bytes).await.unwrap(); - stream.shutdown().await.unwrap(); + let mut stream = TcpStream::connect(addr).await.expect("connect failed"); + stream.write_all(bytes).await.expect("write failed"); + stream.shutdown().await.expect("shutdown failed"); }) .await; @@ -200,8 +202,8 @@ async fn success_callback_can_write_response( factory, |_, stream| { Box::pin(async move { - stream.write_all(b"ACK").await.unwrap(); - stream.flush().await.unwrap(); + stream.write_all(b"ACK").await.expect("write failed"); + stream.flush().await.expect("flush failed"); Ok(()) }) }, @@ -209,11 +211,11 @@ async fn success_callback_can_write_response( ); with_running_server(server, |addr| async move { - let mut stream = TcpStream::connect(addr).await.unwrap(); + let mut stream = TcpStream::connect(addr).await.expect("connect failed"); let bytes = b"TRTPHOTL\x00\x01\x00\x02"; - stream.write_all(bytes).await.unwrap(); + stream.write_all(bytes).await.expect("write failed"); let mut buf = [0u8; 3]; - stream.read_exact(&mut buf).await.unwrap(); + stream.read_exact(&mut buf).await.expect("read failed"); assert_eq!(&buf, b"ACK"); }) .await; diff --git a/tests/push.rs b/tests/push.rs index 187e5cbd..852aad7d 100644 --- a/tests/push.rs +++ b/tests/push.rs @@ -11,11 +11,17 @@ use wireframe::push::{PushError, PushPolicy, PushPriority, PushQueues}; async fn frames_routed_to_correct_priority_queues() { let (mut queues, handle) = PushQueues::bounded(1, 1); - handle.push_low_priority(1u8).await.unwrap(); - handle.push_high_priority(2u8).await.unwrap(); - - let (prio1, frame1) = queues.recv().await.unwrap(); - let (prio2, frame2) = queues.recv().await.unwrap(); + handle + .push_low_priority(1u8) + .await + .expect("push low priority failed"); + handle + .push_high_priority(2u8) + .await + .expect("push high priority failed"); + + let (prio1, frame1) = queues.recv().await.expect("recv failed"); + let (prio2, frame2) = queues.recv().await.expect("recv failed"); assert_eq!(prio1, PushPriority::High); assert_eq!(frame1, 2); @@ -31,14 +37,20 @@ async fn frames_routed_to_correct_priority_queues() { async fn try_push_respects_policy() { let (mut queues, handle) = PushQueues::bounded(1, 1); - handle.push_high_priority(1u8).await.unwrap(); + handle + .push_high_priority(1u8) + .await + .expect("push high priority failed"); let result = handle.try_push(2u8, PushPriority::High, PushPolicy::ReturnErrorIfFull); assert!(result.is_err()); // drain queue to allow new push let _ = queues.recv().await; - handle.push_high_priority(3u8).await.unwrap(); - let (_, last) = queues.recv().await.unwrap(); + handle + .push_high_priority(3u8) + .await + .expect("push high priority failed"); + let (_, last) = queues.recv().await.expect("recv failed"); assert_eq!(last, 3); } @@ -65,11 +77,18 @@ async fn push_queues_error_on_closed() { #[tokio::test] async fn rate_limiter_blocks_when_exceeded(#[case] priority: PushPriority) { time::pause(); - let (mut queues, handle) = PushQueues::bounded_with_rate(2, 2, Some(1)).unwrap(); + let (mut queues, handle) = + PushQueues::bounded_with_rate(2, 2, Some(1)).expect("queue creation failed"); match priority { - PushPriority::High => handle.push_high_priority(1u8).await.unwrap(), - PushPriority::Low => handle.push_low_priority(1u8).await.unwrap(), + PushPriority::High => handle + .push_high_priority(1u8) + .await + .expect("push high priority failed"), + PushPriority::Low => handle + .push_low_priority(1u8) + .await + .expect("push low priority failed"), } let attempt = match priority { @@ -85,12 +104,18 @@ async fn rate_limiter_blocks_when_exceeded(#[case] priority: PushPriority) { time::advance(Duration::from_secs(1)).await; match priority { - PushPriority::High => handle.push_high_priority(3u8).await.unwrap(), - PushPriority::Low => handle.push_low_priority(3u8).await.unwrap(), + PushPriority::High => handle + .push_high_priority(3u8) + .await + .expect("push high priority failed"), + PushPriority::Low => handle + .push_low_priority(3u8) + .await + .expect("push low priority failed"), } - let (_, first) = queues.recv().await.unwrap(); - let (_, second) = queues.recv().await.unwrap(); + let (_, first) = queues.recv().await.expect("recv failed"); + let (_, second) = queues.recv().await.expect("recv failed"); assert_eq!((first, second), (1, 3)); } @@ -98,13 +123,20 @@ async fn rate_limiter_blocks_when_exceeded(#[case] priority: PushPriority) { #[tokio::test] async fn rate_limiter_allows_after_wait() { time::pause(); - let (mut queues, handle) = PushQueues::bounded_with_rate(2, 2, Some(1)).unwrap(); - handle.push_high_priority(1u8).await.unwrap(); + let (mut queues, handle) = + PushQueues::bounded_with_rate(2, 2, Some(1)).expect("queue creation failed"); + handle + .push_high_priority(1u8) + .await + .expect("push high priority failed"); time::advance(Duration::from_secs(1)).await; - handle.push_high_priority(2u8).await.unwrap(); + handle + .push_high_priority(2u8) + .await + .expect("push high priority failed"); - let (_, a) = queues.recv().await.unwrap(); - let (_, b) = queues.recv().await.unwrap(); + let (_, a) = queues.recv().await.expect("recv failed"); + let (_, b) = queues.recv().await.expect("recv failed"); assert_eq!((a, b), (1, 2)); } @@ -114,17 +146,24 @@ async fn rate_limiter_allows_after_wait() { #[tokio::test] async fn rate_limiter_shared_across_priorities() { time::pause(); - let (mut queues, handle) = PushQueues::bounded_with_rate(2, 2, Some(1)).unwrap(); - handle.push_high_priority(1u8).await.unwrap(); + let (mut queues, handle) = + PushQueues::bounded_with_rate(2, 2, Some(1)).expect("queue creation failed"); + handle + .push_high_priority(1u8) + .await + .expect("push high priority failed"); let attempt = time::timeout(Duration::from_millis(10), handle.push_low_priority(2u8)).await; assert!(attempt.is_err(), "second push should block across queues"); time::advance(Duration::from_secs(1)).await; - handle.push_low_priority(2u8).await.unwrap(); + handle + .push_low_priority(2u8) + .await + .expect("push low priority failed"); - let (prio1, frame1) = queues.recv().await.unwrap(); - let (prio2, frame2) = queues.recv().await.unwrap(); + let (prio1, frame1) = queues.recv().await.expect("recv failed"); + let (prio2, frame2) = queues.recv().await.expect("recv failed"); assert_eq!(prio1, PushPriority::High); assert_eq!(frame1, 1); assert_eq!(prio2, PushPriority::Low); @@ -136,12 +175,15 @@ async fn rate_limiter_shared_across_priorities() { async fn unlimited_queues_do_not_block() { time::pause(); let (mut queues, handle) = PushQueues::bounded_no_rate_limit(1, 1); - handle.push_high_priority(1u8).await.unwrap(); + handle + .push_high_priority(1u8) + .await + .expect("push high priority failed"); let res = time::timeout(Duration::from_millis(10), handle.push_low_priority(2u8)).await; assert!(res.is_ok(), "pushes should not block when unlimited"); - let (_, a) = queues.recv().await.unwrap(); - let (_, b) = queues.recv().await.unwrap(); + let (_, a) = queues.recv().await.expect("recv failed"); + let (_, b) = queues.recv().await.expect("recv failed"); assert_eq!((a, b), (1, 2)); } @@ -150,10 +192,14 @@ async fn unlimited_queues_do_not_block() { #[tokio::test] async fn rate_limiter_allows_burst_within_capacity_and_blocks_excess() { time::pause(); - let (mut queues, handle) = PushQueues::bounded_with_rate(4, 4, Some(3)).unwrap(); + let (mut queues, handle) = + PushQueues::bounded_with_rate(4, 4, Some(3)).expect("queue creation failed"); for i in 0u8..3 { - handle.push_high_priority(i).await.unwrap(); + handle + .push_high_priority(i) + .await + .expect("push high priority failed"); } let res = time::timeout(Duration::from_millis(10), handle.push_high_priority(99)).await; @@ -163,10 +209,13 @@ async fn rate_limiter_allows_burst_within_capacity_and_blocks_excess() { ); time::advance(Duration::from_secs(1)).await; - handle.push_high_priority(100).await.unwrap(); + handle + .push_high_priority(100) + .await + .expect("push high priority failed"); for expected in [0u8, 1u8, 2u8, 100u8] { - let (_, frame) = queues.recv().await.unwrap(); + let (_, frame) = queues.recv().await.expect("recv failed"); assert_eq!(frame, expected); } } diff --git a/tests/push_policies.rs b/tests/push_policies.rs index e0173bc6..68bfec99 100644 --- a/tests/push_policies.rs +++ b/tests/push_policies.rs @@ -40,10 +40,15 @@ fn push_policy_behaviour( while logger.pop().is_some() {} let (mut queues, handle) = PushQueues::bounded(1, 1); - handle.push_high_priority(1u8).await.unwrap(); - handle.try_push(2u8, PushPriority::High, policy).unwrap(); + handle + .push_high_priority(1u8) + .await + .expect("push high priority failed"); + handle + .try_push(2u8, PushPriority::High, policy) + .expect("try_push failed"); - let (_, val) = queues.recv().await.unwrap(); + let (_, val) = queues.recv().await.expect("recv failed"); assert_eq!(val, 1); assert!( timeout(Duration::from_millis(20), queues.recv()) @@ -71,23 +76,26 @@ fn push_policy_behaviour( fn dropped_frame_goes_to_dlq(rt: Runtime) { rt.block_on(async { let (dlq_tx, mut dlq_rx) = mpsc::channel(1); - let (mut queues, handle) = - PushQueues::bounded_with_rate_dlq(1, 1, None, Some(dlq_tx)).unwrap(); + let (mut queues, handle) = PushQueues::bounded_with_rate_dlq(1, 1, None, Some(dlq_tx)) + .expect("queue creation failed"); - handle.push_high_priority(1u8).await.unwrap(); + handle + .push_high_priority(1u8) + .await + .expect("push high priority failed"); handle .try_push(2u8, PushPriority::High, PushPolicy::DropIfFull) - .unwrap(); + .expect("try_push failed"); - let (_, val) = queues.recv().await.unwrap(); + let (_, val) = queues.recv().await.expect("recv failed"); assert_eq!(val, 1); - assert_eq!(dlq_rx.recv().await.unwrap(), 2); + assert_eq!(dlq_rx.recv().await.expect("dlq recv failed"), 2); }); } /// Preloads the DLQ to simulate a full queue. fn fill_dlq(tx: &mpsc::Sender, _rx: &mut Option>) { - tx.try_send(99).unwrap(); + tx.try_send(99).expect("send failed"); } /// Drops the receiver to simulate a closed DLQ channel. @@ -97,7 +105,7 @@ fn close_dlq(_: &mpsc::Sender, rx: &mut Option>) { drop(r fn assert_dlq_full(rx: &mut Option>) -> BoxFuture<'_, ()> { Box::pin(async move { let receiver = rx.as_mut().expect("receiver missing"); - assert_eq!(receiver.recv().await.unwrap(), 99); + assert_eq!(receiver.recv().await.expect("dlq recv failed"), 99); assert!(receiver.try_recv().is_err()); }) } @@ -127,13 +135,18 @@ fn dlq_error_scenarios( let (dlq_tx, dlq_rx) = mpsc::channel(1); let mut dlq_rx = Some(dlq_rx); setup(&dlq_tx, &mut dlq_rx); - let (mut queues, handle) = - PushQueues::bounded_with_rate_dlq(1, 1, None, Some(dlq_tx)).unwrap(); + let (mut queues, handle) = PushQueues::bounded_with_rate_dlq(1, 1, None, Some(dlq_tx)) + .expect("queue creation failed"); - handle.push_high_priority(1u8).await.unwrap(); - handle.try_push(2u8, PushPriority::High, policy).unwrap(); + handle + .push_high_priority(1u8) + .await + .expect("push high priority failed"); + handle + .try_push(2u8, PushPriority::High, policy) + .expect("try_push failed"); - let (_, val) = queues.recv().await.unwrap(); + let (_, val) = queues.recv().await.expect("recv failed"); assert_eq!(val, 1); assertion(&mut dlq_rx).await; diff --git a/tests/response.rs b/tests/response.rs index 2ad51051..684ba693 100644 --- a/tests/response.rs +++ b/tests/response.rs @@ -40,17 +40,22 @@ impl<'de> bincode::BorrowDecode<'de, ()> for FailingResp { /// and that the response can be decoded and deserialised back to its original value asynchronously. async fn send_response_encodes_and_frames() { let app = WireframeApp::new() - .unwrap() + .expect("failed to create app") .frame_processor(LengthPrefixedProcessor::default()) .serializer(BincodeSerializer); let mut out = Vec::new(); - app.send_response(&mut out, &TestResp(7)).await.unwrap(); + app.send_response(&mut out, &TestResp(7)) + .await + .expect("send_response failed"); let processor = LengthPrefixedProcessor::default(); let mut buf = BytesMut::from(&out[..]); - let frame = processor.decode(&mut buf).unwrap().unwrap(); - let (decoded, _) = TestResp::from_bytes(&frame).unwrap(); + let frame = processor + .decode(&mut buf) + .expect("decode failed") + .expect("frame missing"); + let (decoded, _) = TestResp::from_bytes(&frame).expect("deserialize failed"); assert_eq!(decoded, TestResp(7)); } @@ -62,7 +67,7 @@ async fn send_response_encodes_and_frames() { async fn length_prefixed_decode_requires_complete_header() { let processor = LengthPrefixedProcessor::default(); let mut buf = BytesMut::from(&[0x00, 0x00, 0x00][..]); // only 3 bytes - assert!(processor.decode(&mut buf).unwrap().is_none()); + assert!(processor.decode(&mut buf).expect("decode failed").is_none()); assert_eq!(buf.len(), 3); // nothing consumed } @@ -74,7 +79,7 @@ async fn length_prefixed_decode_requires_complete_header() { async fn length_prefixed_decode_requires_full_frame() { let processor = LengthPrefixedProcessor::default(); let mut buf = BytesMut::from(&[0x00, 0x00, 0x00, 0x05, 0x01, 0x02][..]); - assert!(processor.decode(&mut buf).unwrap().is_none()); + assert!(processor.decode(&mut buf).expect("decode failed").is_none()); // buffer should retain bytes since frame isn't complete assert_eq!(buf.len(), 6); } @@ -115,16 +120,19 @@ fn custom_length_roundtrip( ) { let processor = LengthPrefixedProcessor::new(fmt); let mut buf = BytesMut::new(); - processor.encode(&frame, &mut buf).unwrap(); + processor.encode(&frame, &mut buf).expect("encode failed"); assert_eq!(&buf[..prefix.len()], &prefix[..]); - let decoded = processor.decode(&mut buf).unwrap().unwrap(); + let decoded = processor + .decode(&mut buf) + .expect("decode failed") + .expect("frame missing"); assert_eq!(decoded, frame); } #[tokio::test] async fn send_response_propagates_write_error() { let app = WireframeApp::new() - .unwrap() + .expect("route registration failed") .frame_processor(LengthPrefixedProcessor::default()); let mut writer = FailingWriter; @@ -182,7 +190,7 @@ fn encode_fails_for_length_too_large(#[case] fmt: LengthFormat, #[case] len: usi /// This test sends a `FailingResp` using `send_response` and asserts that the resulting /// error is of the `Serialize` variant, indicating a failure during response encoding. async fn send_response_returns_encode_error() { - let app = WireframeApp::new().unwrap(); + let app = WireframeApp::new().expect("failed to create app"); let err = app .send_response(&mut Vec::new(), &FailingResp) .await diff --git a/tests/routes.rs b/tests/routes.rs index 7184fc5e..dbaa1652 100644 --- a/tests/routes.rs +++ b/tests/routes.rs @@ -41,7 +41,7 @@ async fn handler_receives_message_and_echoes_response() { let called = Arc::new(AtomicUsize::new(0)); let called_clone = called.clone(); let app = WireframeApp::<_, _, TestEnvelope>::new_with_envelope() - .unwrap() + .expect("failed to create app") .frame_processor(LengthPrefixedProcessor::default()) .route( 1, @@ -53,8 +53,8 @@ async fn handler_receives_message_and_echoes_response() { }) }), ) - .unwrap(); - let msg_bytes = Echo(42).to_bytes().unwrap(); + .expect("route registration failed"); + let msg_bytes = Echo(42).to_bytes().expect("encode failed"); let env = TestEnvelope { id: 1, msg: msg_bytes, @@ -67,12 +67,12 @@ async fn handler_receives_message_and_echoes_response() { let mut buf = BytesMut::from(&out[..]); let frame = LengthPrefixedProcessor::default() .decode(&mut buf) - .unwrap() - .unwrap(); + .expect("decode failed") + .expect("frame missing"); let (resp_env, _) = BincodeSerializer .deserialize::(&frame) - .unwrap(); - let (echo, _) = Echo::from_bytes(&resp_env.msg).unwrap(); + .expect("deserialize failed"); + let (echo, _) = Echo::from_bytes(&resp_env.msg).expect("decode echo failed"); assert_eq!(echo, Echo(42)); assert_eq!(called.load(Ordering::SeqCst), 1); } @@ -80,26 +80,28 @@ async fn handler_receives_message_and_echoes_response() { #[tokio::test] async fn multiple_frames_processed_in_sequence() { let app = WireframeApp::<_, _, TestEnvelope>::new_with_envelope() - .unwrap() + .expect("failed to create app") .frame_processor(LengthPrefixedProcessor::default()) .route( 1, std::sync::Arc::new(|_: &TestEnvelope| Box::pin(async {})), ) - .unwrap(); + .expect("route registration failed"); let frames: Vec> = (1u8..=2) .map(|id| { - let msg_bytes = Echo(id).to_bytes().unwrap(); + let msg_bytes = Echo(id).to_bytes().expect("encode failed"); let env = TestEnvelope { id: 1, msg: msg_bytes, }; - let env_bytes = BincodeSerializer.serialize(&env).unwrap(); + let env_bytes = BincodeSerializer + .serialize(&env) + .expect("serialization failed"); let mut framed = BytesMut::new(); LengthPrefixedProcessor::default() .encode(&env_bytes, &mut framed) - .unwrap(); + .expect("encode failed"); framed.to_vec() }) .collect(); @@ -111,20 +113,20 @@ async fn multiple_frames_processed_in_sequence() { let mut buf = BytesMut::from(&out[..]); let first = LengthPrefixedProcessor::default() .decode(&mut buf) - .unwrap() - .unwrap(); + .expect("decode failed") + .expect("frame missing"); let (env1, _) = BincodeSerializer .deserialize::(&first) - .unwrap(); - let (echo1, _) = Echo::from_bytes(&env1.msg).unwrap(); + .expect("deserialize failed"); + let (echo1, _) = Echo::from_bytes(&env1.msg).expect("decode echo failed"); let second = LengthPrefixedProcessor::default() .decode(&mut buf) - .unwrap() - .unwrap(); + .expect("decode failed") + .expect("frame missing"); let (env2, _) = BincodeSerializer .deserialize::(&second) - .unwrap(); - let (echo2, _) = Echo::from_bytes(&env2.msg).unwrap(); + .expect("deserialize failed"); + let (echo2, _) = Echo::from_bytes(&env2.msg).expect("decode echo failed"); assert_eq!(echo1, Echo(1)); assert_eq!(echo2, Echo(2)); } diff --git a/tests/server.rs b/tests/server.rs index 3eda6da0..14559199 100644 --- a/tests/server.rs +++ b/tests/server.rs @@ -42,10 +42,14 @@ async fn readiness_receiver_dropped() { let factory = || WireframeApp::new().expect("WireframeApp::new failed"); let server = WireframeServer::new(factory) .workers(1) - .bind("127.0.0.1:0".parse().unwrap()) - .unwrap(); + .bind( + "127.0.0.1:0" + .parse() + .expect("hard-coded socket address must be valid"), + ) + .expect("bind failed"); - let addr = server.local_addr().unwrap(); + let addr = server.local_addr().expect("local addr missing"); // Create channel and immediately drop receiver to force send failure let (tx_ready, rx_ready) = oneshot::channel(); drop(rx_ready); @@ -55,7 +59,7 @@ async fn readiness_receiver_dropped() { .ready_signal(tx_ready) .run_with_shutdown(tokio::time::sleep(Duration::from_millis(200))) .await - .unwrap(); + .expect("server run failed"); }); // Wait briefly to ensure server attempted to send readiness signal diff --git a/tests/session_registry.rs b/tests/session_registry.rs index 1f88e68f..12cede5e 100644 --- a/tests/session_registry.rs +++ b/tests/session_registry.rs @@ -31,8 +31,8 @@ async fn handle_retrieved_while_alive( registry.insert(id, &handle); let retrieved = registry.get(&id).expect("handle should be present"); - retrieved.push_high_priority(7).await.unwrap(); - let (_, val) = queues.recv().await.unwrap(); + retrieved.push_high_priority(7).await.expect("push failed"); + let (_, val) = queues.recv().await.expect("recv failed"); assert_eq!(val, 7); } diff --git a/tests/wireframe_protocol.rs b/tests/wireframe_protocol.rs index e13e7934..79f107dc 100644 --- a/tests/wireframe_protocol.rs +++ b/tests/wireframe_protocol.rs @@ -51,7 +51,9 @@ async fn builder_produces_protocol_hooks() { let protocol = TestProtocol { counter: counter.clone(), }; - let app = WireframeApp::new().unwrap().with_protocol(protocol); + let app = WireframeApp::new() + .expect("failed to create app") + .with_protocol(protocol); let mut hooks = app.protocol_hooks(); let (queues, handle) = PushQueues::bounded(1, 1); @@ -73,11 +75,16 @@ async fn connection_actor_uses_protocol_from_builder() { let protocol = TestProtocol { counter: counter.clone(), }; - let app = WireframeApp::new().unwrap().with_protocol(protocol); + let app = WireframeApp::new() + .expect("failed to create app") + .with_protocol(protocol); let hooks = app.protocol_hooks(); let (queues, handle) = PushQueues::bounded(8, 8); - handle.push_high_priority(vec![1]).await.unwrap(); + handle + .push_high_priority(vec![1]) + .await + .expect("push failed"); let stream = stream::iter(vec![Ok(vec![2u8])]); let mut actor: ConnectionActor<_, ()> = ConnectionActor::with_hooks( queues, @@ -87,7 +94,7 @@ async fn connection_actor_uses_protocol_from_builder() { hooks, ); let mut out = Vec::new(); - actor.run(&mut out).await.unwrap(); + actor.run(&mut out).await.expect("actor run failed"); assert_eq!(out, vec![vec![1, 1], vec![2, 1]]); assert_eq!(counter.load(Ordering::SeqCst), 2); diff --git a/wireframe_testing/src/helpers.rs b/wireframe_testing/src/helpers.rs index cc3c81da..726a57cd 100644 --- a/wireframe_testing/src/helpers.rs +++ b/wireframe_testing/src/helpers.rs @@ -454,7 +454,7 @@ where let mut buf = Vec::new(); client.read_to_end(&mut buf).await?; - server_task.await.unwrap(); + server_task.await.expect("server task panicked"); Ok(buf) } From 8e2fe470a3289968fd310f9558e8f4c71136f84a Mon Sep 17 00:00:00 2001 From: Leynos Date: Sat, 2 Aug 2025 09:26:34 +0100 Subject: [PATCH 02/10] Add expect macros and document infallible call --- docs/wireframe-testing-crate.md | 11 ++++ src/push.rs | 8 ++- tests/push.rs | 100 +++++++++---------------------- wireframe_testing/src/helpers.rs | 32 ++++++++++ 4 files changed, 77 insertions(+), 74 deletions(-) diff --git a/docs/wireframe-testing-crate.md b/docs/wireframe-testing-crate.md index 3c00bb49..dc19cbe4 100644 --- a/docs/wireframe-testing-crate.md +++ b/docs/wireframe-testing-crate.md @@ -120,6 +120,17 @@ let bytes = drive_with_bincode(app, Ping(1)).await.unwrap(); assert_eq!(bytes, [0, 1]); ``` +### Helper macros + +Two small macros, `push_expect!` and `recv_expect!`, reduce boilerplate in test +code. They await a future and panic with a message including the call site when +the future resolves to an error. + +```rust +push_expect!(handle.push_high_priority(42)); +let (_, frame) = recv_expect!(queues.recv()); +``` + ## Example Usage ```rust diff --git a/src/push.rs b/src/push.rs index c25abff0..46ede289 100644 --- a/src/push.rs +++ b/src/push.rs @@ -309,8 +309,12 @@ impl PushQueues { high_capacity: usize, low_capacity: usize, ) -> (Self, PushHandle) { - Self::bounded_with_rate_dlq(high_capacity, low_capacity, None, None) - .expect("bounded_no_rate_limit should not fail") + // `bounded_with_rate_dlq` only fails when given an invalid rate. Passing + // `None` disables rate limiting entirely so the call is infallible. The + // debug assertion guards against future regressions. + let result = Self::bounded_with_rate_dlq(high_capacity, low_capacity, None, None); + debug_assert!(result.is_ok(), "bounded_no_rate_limit should not fail"); + result.expect("bounded_no_rate_limit should not fail") } /// Create queues with a custom rate limit in pushes per second. diff --git a/tests/push.rs b/tests/push.rs index 852aad7d..ed3f991b 100644 --- a/tests/push.rs +++ b/tests/push.rs @@ -5,23 +5,18 @@ use rstest::rstest; use tokio::time::{self, Duration}; use wireframe::push::{PushError, PushPolicy, PushPriority, PushQueues}; +use wireframe_testing::{push_expect, recv_expect}; /// Frames are delivered to queues matching their push priority. #[tokio::test] async fn frames_routed_to_correct_priority_queues() { let (mut queues, handle) = PushQueues::bounded(1, 1); - handle - .push_low_priority(1u8) - .await - .expect("push low priority failed"); - handle - .push_high_priority(2u8) - .await - .expect("push high priority failed"); + push_expect!(handle.push_low_priority(1u8)); + push_expect!(handle.push_high_priority(2u8)); - let (prio1, frame1) = queues.recv().await.expect("recv failed"); - let (prio2, frame2) = queues.recv().await.expect("recv failed"); + let (prio1, frame1) = recv_expect!(queues.recv()); + let (prio2, frame2) = recv_expect!(queues.recv()); assert_eq!(prio1, PushPriority::High); assert_eq!(frame1, 2); @@ -37,20 +32,14 @@ async fn frames_routed_to_correct_priority_queues() { async fn try_push_respects_policy() { let (mut queues, handle) = PushQueues::bounded(1, 1); - handle - .push_high_priority(1u8) - .await - .expect("push high priority failed"); + push_expect!(handle.push_high_priority(1u8)); let result = handle.try_push(2u8, PushPriority::High, PushPolicy::ReturnErrorIfFull); assert!(result.is_err()); // drain queue to allow new push let _ = queues.recv().await; - handle - .push_high_priority(3u8) - .await - .expect("push high priority failed"); - let (_, last) = queues.recv().await.expect("recv failed"); + push_expect!(handle.push_high_priority(3u8)); + let (_, last) = recv_expect!(queues.recv()); assert_eq!(last, 3); } @@ -81,14 +70,8 @@ async fn rate_limiter_blocks_when_exceeded(#[case] priority: PushPriority) { PushQueues::bounded_with_rate(2, 2, Some(1)).expect("queue creation failed"); match priority { - PushPriority::High => handle - .push_high_priority(1u8) - .await - .expect("push high priority failed"), - PushPriority::Low => handle - .push_low_priority(1u8) - .await - .expect("push low priority failed"), + PushPriority::High => push_expect!(handle.push_high_priority(1u8)), + PushPriority::Low => push_expect!(handle.push_low_priority(1u8)), } let attempt = match priority { @@ -104,18 +87,12 @@ async fn rate_limiter_blocks_when_exceeded(#[case] priority: PushPriority) { time::advance(Duration::from_secs(1)).await; match priority { - PushPriority::High => handle - .push_high_priority(3u8) - .await - .expect("push high priority failed"), - PushPriority::Low => handle - .push_low_priority(3u8) - .await - .expect("push low priority failed"), + PushPriority::High => push_expect!(handle.push_high_priority(3u8)), + PushPriority::Low => push_expect!(handle.push_low_priority(3u8)), } - let (_, first) = queues.recv().await.expect("recv failed"); - let (_, second) = queues.recv().await.expect("recv failed"); + let (_, first) = recv_expect!(queues.recv()); + let (_, second) = recv_expect!(queues.recv()); assert_eq!((first, second), (1, 3)); } @@ -125,18 +102,12 @@ async fn rate_limiter_allows_after_wait() { time::pause(); let (mut queues, handle) = PushQueues::bounded_with_rate(2, 2, Some(1)).expect("queue creation failed"); - handle - .push_high_priority(1u8) - .await - .expect("push high priority failed"); + push_expect!(handle.push_high_priority(1u8)); time::advance(Duration::from_secs(1)).await; - handle - .push_high_priority(2u8) - .await - .expect("push high priority failed"); + push_expect!(handle.push_high_priority(2u8)); - let (_, a) = queues.recv().await.expect("recv failed"); - let (_, b) = queues.recv().await.expect("recv failed"); + let (_, a) = recv_expect!(queues.recv()); + let (_, b) = recv_expect!(queues.recv()); assert_eq!((a, b), (1, 2)); } @@ -148,22 +119,16 @@ async fn rate_limiter_shared_across_priorities() { time::pause(); let (mut queues, handle) = PushQueues::bounded_with_rate(2, 2, Some(1)).expect("queue creation failed"); - handle - .push_high_priority(1u8) - .await - .expect("push high priority failed"); + push_expect!(handle.push_high_priority(1u8)); let attempt = time::timeout(Duration::from_millis(10), handle.push_low_priority(2u8)).await; assert!(attempt.is_err(), "second push should block across queues"); time::advance(Duration::from_secs(1)).await; - handle - .push_low_priority(2u8) - .await - .expect("push low priority failed"); + push_expect!(handle.push_low_priority(2u8)); - let (prio1, frame1) = queues.recv().await.expect("recv failed"); - let (prio2, frame2) = queues.recv().await.expect("recv failed"); + let (prio1, frame1) = recv_expect!(queues.recv()); + let (prio2, frame2) = recv_expect!(queues.recv()); assert_eq!(prio1, PushPriority::High); assert_eq!(frame1, 1); assert_eq!(prio2, PushPriority::Low); @@ -175,15 +140,12 @@ async fn rate_limiter_shared_across_priorities() { async fn unlimited_queues_do_not_block() { time::pause(); let (mut queues, handle) = PushQueues::bounded_no_rate_limit(1, 1); - handle - .push_high_priority(1u8) - .await - .expect("push high priority failed"); + push_expect!(handle.push_high_priority(1u8)); let res = time::timeout(Duration::from_millis(10), handle.push_low_priority(2u8)).await; assert!(res.is_ok(), "pushes should not block when unlimited"); - let (_, a) = queues.recv().await.expect("recv failed"); - let (_, b) = queues.recv().await.expect("recv failed"); + let (_, a) = recv_expect!(queues.recv()); + let (_, b) = recv_expect!(queues.recv()); assert_eq!((a, b), (1, 2)); } @@ -196,10 +158,7 @@ async fn rate_limiter_allows_burst_within_capacity_and_blocks_excess() { PushQueues::bounded_with_rate(4, 4, Some(3)).expect("queue creation failed"); for i in 0u8..3 { - handle - .push_high_priority(i) - .await - .expect("push high priority failed"); + push_expect!(handle.push_high_priority(i)); } let res = time::timeout(Duration::from_millis(10), handle.push_high_priority(99)).await; @@ -209,13 +168,10 @@ async fn rate_limiter_allows_burst_within_capacity_and_blocks_excess() { ); time::advance(Duration::from_secs(1)).await; - handle - .push_high_priority(100) - .await - .expect("push high priority failed"); + push_expect!(handle.push_high_priority(100)); for expected in [0u8, 1u8, 2u8, 100u8] { - let (_, frame) = queues.recv().await.expect("recv failed"); + let (_, frame) = recv_expect!(queues.recv()); assert_eq!(frame, expected); } } diff --git a/wireframe_testing/src/helpers.rs b/wireframe_testing/src/helpers.rs index 726a57cd..13749250 100644 --- a/wireframe_testing/src/helpers.rs +++ b/wireframe_testing/src/helpers.rs @@ -486,3 +486,35 @@ where let (_client, server) = duplex(64); app.handle_connection(server).await; } + +/// Await the provided future and panic with context on failure. +/// +/// In debug builds, the generated message includes the call site for easier +/// troubleshooting. +#[macro_export] +macro_rules! push_expect { + ($fut:expr) => {{ + $fut.await + .expect(concat!("push failed at ", file!(), ":", line!())) + }}; + ($fut:expr, $msg:expr) => {{ + let m = ::std::format!("{msg} at {}:{}", file!(), line!(), msg = $msg); + $fut.await.expect(&m) + }}; +} + +/// Await the provided future and panic with context on failure. +/// +/// In debug builds, the generated message includes the call site for easier +/// troubleshooting. +#[macro_export] +macro_rules! recv_expect { + ($fut:expr) => {{ + $fut.await + .expect(concat!("recv failed at ", file!(), ":", line!())) + }}; + ($fut:expr, $msg:expr) => {{ + let m = ::std::format!("{msg} at {}:{}", file!(), line!(), msg = $msg); + $fut.await.expect(&m) + }}; +} From 23a2bbe9ee2684416fbd6150242a649185cc295e Mon Sep 17 00:00:00 2001 From: Leynos Date: Sun, 3 Aug 2025 02:17:32 +0100 Subject: [PATCH 03/10] Use push_expect macros in connection actor tests Replace verbose await/expect pairs with the push_expect! helper to standardise low- and high-priority push failure messages. Rewriting let-chain expressions to nested conditionals keeps the build on stable Rust, and lint expectations were relaxed accordingly. --- src/connection.rs | 37 +++++++++++------- src/push.rs | 16 +++++--- src/server.rs | 4 +- tests/connection_actor.rs | 80 ++++++++++++++------------------------- 4 files changed, 66 insertions(+), 71 deletions(-) diff --git a/src/connection.rs b/src/connection.rs index 4fdb1a5d..82213ee2 100644 --- a/src/connection.rs +++ b/src/connection.rs @@ -40,7 +40,9 @@ impl Drop for ActiveConnection { /// Return the current number of active connections. #[must_use] -pub fn active_connection_count() -> u64 { ACTIVE_CONNECTIONS.load(Ordering::Relaxed) } +pub fn active_connection_count() -> u64 { + ACTIVE_CONNECTIONS.load(Ordering::Relaxed) +} use crate::{ fairness::Fairness, @@ -185,11 +187,15 @@ where pub fn set_fairness(&mut self, fairness: FairnessConfig) { self.fairness.set_config(fairness); } /// Set or replace the current streaming response. - pub fn set_response(&mut self, stream: Option>) { self.response = stream; } + pub fn set_response(&mut self, stream: Option>) { + self.response = stream; + } /// Get a clone of the shutdown token used by the actor. #[must_use] - pub fn shutdown_token(&self) -> CancellationToken { self.shutdown.clone() } + pub fn shutdown_token(&self) -> CancellationToken { + self.shutdown.clone() + } /// Drive the actor until all sources are exhausted or shutdown is triggered. /// @@ -402,11 +408,6 @@ where out.push(frame); self.after_low(); } - Err(mpsc::error::TryRecvError::Empty) => {} - Err(mpsc::error::TryRecvError::Disconnected) => { - self.low_rx = None; - state.mark_closed(); - } } } } @@ -449,11 +450,15 @@ where /// Await cancellation on the provided shutdown token. #[inline] - async fn wait_shutdown(token: CancellationToken) { token.cancelled_owned().await; } + async fn wait_shutdown(token: CancellationToken) { + token.cancelled_owned().await; + } /// Receive the next frame from a push queue. #[inline] - async fn recv_push(rx: &mut mpsc::Receiver) -> Option { rx.recv().await } + async fn recv_push(rx: &mut mpsc::Receiver) -> Option { + rx.recv().await + } /// Poll `f` if `opt` is `Some`, returning `None` otherwise. #[expect( @@ -536,11 +541,17 @@ impl ActorState { } /// Returns `true` while the actor is actively processing sources. - fn is_active(&self) -> bool { matches!(self.run_state, RunState::Active) } + fn is_active(&self) -> bool { + matches!(self.run_state, RunState::Active) + } /// Returns `true` once shutdown has begun. - fn is_shutting_down(&self) -> bool { matches!(self.run_state, RunState::ShuttingDown) } + fn is_shutting_down(&self) -> bool { + matches!(self.run_state, RunState::ShuttingDown) + } /// Returns `true` when all sources have finished. - fn is_done(&self) -> bool { matches!(self.run_state, RunState::Finished) } + fn is_done(&self) -> bool { + matches!(self.run_state, RunState::Finished) + } } diff --git a/src/push.rs b/src/push.rs index 46ede289..74d1aa35 100644 --- a/src/push.rs +++ b/src/push.rs @@ -97,7 +97,9 @@ pub(crate) struct PushHandleInner { pub struct PushHandle(Arc>); impl PushHandle { - pub(crate) fn from_arc(arc: Arc>) -> Self { Self(arc) } + pub(crate) fn from_arc(arc: Arc>) -> Self { + Self(arc) + } /// Internal helper to push a frame with the requested priority. /// @@ -253,7 +255,9 @@ impl PushHandle { } /// Downgrade to a `Weak` reference for storage in a registry. - pub(crate) fn downgrade(&self) -> Weak> { Arc::downgrade(&self.0) } + pub(crate) fn downgrade(&self) -> Weak> { + Arc::downgrade(&self.0) + } } /// Receiver ends of the push queues stored by the connection actor. @@ -387,10 +391,10 @@ impl PushQueues { rate: Option, dlq: Option>, ) -> Result<(Self, PushHandle), PushConfigError> { - if let Some(r) = rate - && (r == 0 || r > MAX_PUSH_RATE) - { - return Err(PushConfigError::InvalidRate(r)); + if let Some(r) = rate { + if r == 0 || r > MAX_PUSH_RATE { + return Err(PushConfigError::InvalidRate(r)); + } } let (high_tx, high_rx) = mpsc::channel(high_capacity); let (low_tx, low_rx) = mpsc::channel(low_capacity); diff --git a/src/server.rs b/src/server.rs index 572711f1..d16c3235 100644 --- a/src/server.rs +++ b/src/server.rs @@ -229,7 +229,9 @@ where /// ``` #[inline] #[must_use] - pub const fn worker_count(&self) -> usize { self.workers } + pub const fn worker_count(&self) -> usize { + self.workers + } /// Get the socket address the server is bound to, if available. #[must_use] diff --git a/tests/connection_actor.rs b/tests/connection_actor.rs index 867fe451..2c5e5e3d 100644 --- a/tests/connection_actor.rs +++ b/tests/connection_actor.rs @@ -15,27 +15,34 @@ use wireframe::{ push::PushQueues, response::{FrameStream, WireframeError}, }; +use wireframe_testing::push_expect; #[fixture] #[allow( unused_braces, reason = "rustc false positive for single line rstest fixtures" )] -fn queues() -> (PushQueues, wireframe::push::PushHandle) { PushQueues::bounded(8, 8) } +fn queues() -> (PushQueues, wireframe::push::PushHandle) { + PushQueues::bounded(8, 8) +} #[fixture] #[allow( unused_braces, reason = "rustc false positive for single line rstest fixtures" )] -fn shutdown_token() -> CancellationToken { CancellationToken::new() } +fn shutdown_token() -> CancellationToken { + CancellationToken::new() +} #[fixture] #[allow( unused_braces, reason = "rustc false positive for single line rstest fixtures" )] -fn empty_stream() -> Option> { None } +fn empty_stream() -> Option> { + None +} #[rstest] #[tokio::test] @@ -45,14 +52,8 @@ async fn strict_priority_order( shutdown_token: CancellationToken, ) { let (queues, handle) = queues; - handle - .push_low_priority(2) - .await - .expect("push low priority failed"); - handle - .push_high_priority(1) - .await - .expect("push high priority failed"); + push_expect!(handle.push_low_priority(2), "push low-priority"); + push_expect!(handle.push_high_priority(1), "push high-priority"); let stream = stream::iter(vec![Ok(3u8)]); let mut actor: ConnectionActor<_, ()> = @@ -76,15 +77,9 @@ async fn fairness_yields_low_after_burst( }; for n in 1..=5 { - handle - .push_high_priority(n) - .await - .expect("push high priority failed"); + push_expect!(handle.push_high_priority(n), "push high-priority"); } - handle - .push_low_priority(99) - .await - .expect("push low priority failed"); + push_expect!(handle.push_low_priority(99), "push low-priority"); let mut actor: ConnectionActor<_, ()> = ConnectionActor::new(queues, handle, None, shutdown_token); @@ -116,14 +111,18 @@ async fn queue_frames( for priority in order { match priority { Priority::High => { - let msg = format!("failed to push high-priority frame {next_high}"); - handle.push_high_priority(next_high).await.expect(&msg); + push_expect!( + handle.push_high_priority(next_high), + format!("push high-priority frame {next_high}") + ); highs.push(next_high); next_high += 1; } Priority::Low => { - let msg = format!("failed to push low-priority frame {next_low}"); - handle.push_low_priority(next_low).await.expect(&msg); + push_expect!( + handle.push_low_priority(next_low), + format!("push low-priority frame {next_low}") + ); lows.push(next_low); next_low += 1; } @@ -210,25 +209,13 @@ async fn fairness_yields_low_with_time_slice( let _ = tx.send(out); }); - handle - .push_high_priority(1) - .await - .expect("push high priority failed"); + push_expect!(handle.push_high_priority(1), "push high-priority"); tokio::time::advance(Duration::from_millis(5)).await; - handle - .push_high_priority(2) - .await - .expect("push high priority failed"); + push_expect!(handle.push_high_priority(2), "push high-priority"); tokio::time::advance(Duration::from_millis(15)).await; - handle - .push_low_priority(42) - .await - .expect("push low priority failed"); + push_expect!(handle.push_low_priority(42), "push low-priority"); for n in 3..=5 { - handle - .push_high_priority(n) - .await - .expect("push high priority failed"); + push_expect!(handle.push_high_priority(n), "push high-priority"); } drop(handle); @@ -269,10 +256,7 @@ async fn complete_draining_of_sources( shutdown_token: CancellationToken, ) { let (queues, handle) = queues; - handle - .push_high_priority(1) - .await - .expect("push high priority failed"); + push_expect!(handle.push_high_priority(1), "push high-priority"); let stream = stream::iter(vec![Ok(2u8), Ok(3u8)]); let mut actor: ConnectionActor<_, ()> = @@ -403,10 +387,7 @@ async fn interleaved_shutdown_during_stream( #[serial] async fn push_queue_exhaustion_backpressure() { let (mut queues, handle) = PushQueues::bounded(1, 1); - handle - .push_high_priority(1) - .await - .expect("push high priority failed"); + push_expect!(handle.push_high_priority(1), "push high-priority"); let blocked = timeout(Duration::from_millis(50), handle.push_high_priority(2)).await; assert!(blocked.is_err()); @@ -432,10 +413,7 @@ async fn before_send_hook_modifies_frames( shutdown_token: CancellationToken, ) { let (queues, handle) = queues; - handle - .push_high_priority(1) - .await - .expect("push high priority failed"); + push_expect!(handle.push_high_priority(1), "push high-priority"); let stream = stream::iter(vec![Ok(2u8)]); let hooks = ProtocolHooks { From c517474fec85faaacf9f0196ad0241754a518a71 Mon Sep 17 00:00:00 2001 From: Leynos Date: Sun, 3 Aug 2025 14:22:23 +0100 Subject: [PATCH 04/10] Apply formatting --- src/connection.rs | 32 ++++++++------------------------ src/push.rs | 8 ++------ src/server.rs | 4 +--- tests/connection_actor.rs | 12 +++--------- 4 files changed, 14 insertions(+), 42 deletions(-) diff --git a/src/connection.rs b/src/connection.rs index 82213ee2..55f7b7eb 100644 --- a/src/connection.rs +++ b/src/connection.rs @@ -40,9 +40,7 @@ impl Drop for ActiveConnection { /// Return the current number of active connections. #[must_use] -pub fn active_connection_count() -> u64 { - ACTIVE_CONNECTIONS.load(Ordering::Relaxed) -} +pub fn active_connection_count() -> u64 { ACTIVE_CONNECTIONS.load(Ordering::Relaxed) } use crate::{ fairness::Fairness, @@ -187,15 +185,11 @@ where pub fn set_fairness(&mut self, fairness: FairnessConfig) { self.fairness.set_config(fairness); } /// Set or replace the current streaming response. - pub fn set_response(&mut self, stream: Option>) { - self.response = stream; - } + pub fn set_response(&mut self, stream: Option>) { self.response = stream; } /// Get a clone of the shutdown token used by the actor. #[must_use] - pub fn shutdown_token(&self) -> CancellationToken { - self.shutdown.clone() - } + pub fn shutdown_token(&self) -> CancellationToken { self.shutdown.clone() } /// Drive the actor until all sources are exhausted or shutdown is triggered. /// @@ -450,15 +444,11 @@ where /// Await cancellation on the provided shutdown token. #[inline] - async fn wait_shutdown(token: CancellationToken) { - token.cancelled_owned().await; - } + async fn wait_shutdown(token: CancellationToken) { token.cancelled_owned().await; } /// Receive the next frame from a push queue. #[inline] - async fn recv_push(rx: &mut mpsc::Receiver) -> Option { - rx.recv().await - } + async fn recv_push(rx: &mut mpsc::Receiver) -> Option { rx.recv().await } /// Poll `f` if `opt` is `Some`, returning `None` otherwise. #[expect( @@ -541,17 +531,11 @@ impl ActorState { } /// Returns `true` while the actor is actively processing sources. - fn is_active(&self) -> bool { - matches!(self.run_state, RunState::Active) - } + fn is_active(&self) -> bool { matches!(self.run_state, RunState::Active) } /// Returns `true` once shutdown has begun. - fn is_shutting_down(&self) -> bool { - matches!(self.run_state, RunState::ShuttingDown) - } + fn is_shutting_down(&self) -> bool { matches!(self.run_state, RunState::ShuttingDown) } /// Returns `true` when all sources have finished. - fn is_done(&self) -> bool { - matches!(self.run_state, RunState::Finished) - } + fn is_done(&self) -> bool { matches!(self.run_state, RunState::Finished) } } diff --git a/src/push.rs b/src/push.rs index 74d1aa35..491a3dfc 100644 --- a/src/push.rs +++ b/src/push.rs @@ -97,9 +97,7 @@ pub(crate) struct PushHandleInner { pub struct PushHandle(Arc>); impl PushHandle { - pub(crate) fn from_arc(arc: Arc>) -> Self { - Self(arc) - } + pub(crate) fn from_arc(arc: Arc>) -> Self { Self(arc) } /// Internal helper to push a frame with the requested priority. /// @@ -255,9 +253,7 @@ impl PushHandle { } /// Downgrade to a `Weak` reference for storage in a registry. - pub(crate) fn downgrade(&self) -> Weak> { - Arc::downgrade(&self.0) - } + pub(crate) fn downgrade(&self) -> Weak> { Arc::downgrade(&self.0) } } /// Receiver ends of the push queues stored by the connection actor. diff --git a/src/server.rs b/src/server.rs index d16c3235..572711f1 100644 --- a/src/server.rs +++ b/src/server.rs @@ -229,9 +229,7 @@ where /// ``` #[inline] #[must_use] - pub const fn worker_count(&self) -> usize { - self.workers - } + pub const fn worker_count(&self) -> usize { self.workers } /// Get the socket address the server is bound to, if available. #[must_use] diff --git a/tests/connection_actor.rs b/tests/connection_actor.rs index 2c5e5e3d..ef1dbe79 100644 --- a/tests/connection_actor.rs +++ b/tests/connection_actor.rs @@ -22,27 +22,21 @@ use wireframe_testing::push_expect; unused_braces, reason = "rustc false positive for single line rstest fixtures" )] -fn queues() -> (PushQueues, wireframe::push::PushHandle) { - PushQueues::bounded(8, 8) -} +fn queues() -> (PushQueues, wireframe::push::PushHandle) { PushQueues::bounded(8, 8) } #[fixture] #[allow( unused_braces, reason = "rustc false positive for single line rstest fixtures" )] -fn shutdown_token() -> CancellationToken { - CancellationToken::new() -} +fn shutdown_token() -> CancellationToken { CancellationToken::new() } #[fixture] #[allow( unused_braces, reason = "rustc false positive for single line rstest fixtures" )] -fn empty_stream() -> Option> { - None -} +fn empty_stream() -> Option> { None } #[rstest] #[tokio::test] From cbb7f90f3ef57e075b0f119021638b01692bc94c Mon Sep 17 00:00:00 2001 From: Leynos Date: Sun, 3 Aug 2025 15:14:28 +0100 Subject: [PATCH 05/10] Handle low queue TryRecvError and fix test message (#252) * Handle missing TryRecvError and fix test message * Apply formatting --- examples/metadata_routing.rs | 5 +---- src/connection.rs | 26 +++++++++++++++++--------- src/server.rs | 13 +++++-------- tests/response.rs | 2 +- 4 files changed, 24 insertions(+), 22 deletions(-) diff --git a/examples/metadata_routing.rs b/examples/metadata_routing.rs index eb53e3d0..9d2e7ead 100644 --- a/examples/metadata_routing.rs +++ b/examples/metadata_routing.rs @@ -60,10 +60,7 @@ impl FrameMetadata for HeaderSerializer { struct Ping; #[derive(bincode::Decode, bincode::Encode)] -#[expect( - dead_code, - reason = "placeholder for demonstration of metadata routing" -)] +// Placeholder for demonstration of metadata routing; not used directly. struct Pong; #[tokio::main] diff --git a/src/connection.rs b/src/connection.rs index 55f7b7eb..ae5c3b8d 100644 --- a/src/connection.rs +++ b/src/connection.rs @@ -12,7 +12,10 @@ use std::{ }; use futures::StreamExt; -use tokio::{sync::mpsc, time::Duration}; +use tokio::{ + sync::mpsc::{self, error::TryRecvError}, + time::Duration, +}; use tokio_util::sync::CancellationToken; use tracing::{info, info_span, warn}; @@ -393,14 +396,19 @@ where fn after_high(&mut self, out: &mut Vec, state: &mut ActorState) { self.fairness.after_high(); - if self.fairness.should_yield() - && let Some(rx) = &mut self.low_rx - { - match rx.try_recv() { - Ok(mut frame) => { - self.hooks.before_send(&mut frame, &mut self.ctx); - out.push(frame); - self.after_low(); + if self.fairness.should_yield() { + let res = self.low_rx.as_mut().map(mpsc::Receiver::try_recv); + if let Some(res) = res { + match res { + Ok(mut frame) => { + self.hooks.before_send(&mut frame, &mut self.ctx); + out.push(frame); + self.after_low(); + } + Err(TryRecvError::Empty) => {} + Err(TryRecvError::Disconnected) => { + Self::handle_closed_receiver(&mut self.low_rx, state); + } } } } diff --git a/src/server.rs b/src/server.rs index 572711f1..b34942d0 100644 --- a/src/server.rs +++ b/src/server.rs @@ -507,10 +507,10 @@ async fn process_stream( let peer_addr = stream.peer_addr().ok(); match read_preamble::<_, T>(&mut stream).await { Ok((preamble, leftover)) => { - if let Some(handler) = on_success.as_ref() - && let Err(e) = handler(&preamble, &mut stream).await - { - tracing::error!(error = ?e, ?peer_addr, "preamble callback error"); + if let Some(handler) = on_success.as_ref() { + if let Err(e) = handler(&preamble, &mut stream).await { + tracing::error!(error = ?e, ?peer_addr, "preamble callback error"); + } } let stream = RewindStream::new(leftover, stream); // Hand the connection to the application for processing. @@ -558,10 +558,7 @@ mod tests { /// Test helper preamble carrying no data. #[derive(Debug, Clone, PartialEq, Encode, Decode)] - #[expect( - dead_code, - reason = "used only in doctest to illustrate an empty preamble" - )] + // Used only in doctest to illustrate an empty preamble. struct EmptyPreamble; #[fixture] diff --git a/tests/response.rs b/tests/response.rs index 684ba693..20a5857c 100644 --- a/tests/response.rs +++ b/tests/response.rs @@ -132,7 +132,7 @@ fn custom_length_roundtrip( #[tokio::test] async fn send_response_propagates_write_error() { let app = WireframeApp::new() - .expect("route registration failed") + .expect("app creation failed") .frame_processor(LengthPrefixedProcessor::default()); let mut writer = FailingWriter; From a31f47532f4002042cf975a1049e3a0b9249fd4e Mon Sep 17 00:00:00 2001 From: Leynos Date: Sun, 3 Aug 2025 15:58:32 +0100 Subject: [PATCH 06/10] Collapse nested conditionals (#253) --- src/push.rs | 16 ++++++++++------ src/server.rs | 13 ++++++++++--- 2 files changed, 20 insertions(+), 9 deletions(-) diff --git a/src/push.rs b/src/push.rs index 491a3dfc..6712da33 100644 --- a/src/push.rs +++ b/src/push.rs @@ -97,7 +97,9 @@ pub(crate) struct PushHandleInner { pub struct PushHandle(Arc>); impl PushHandle { - pub(crate) fn from_arc(arc: Arc>) -> Self { Self(arc) } + pub(crate) fn from_arc(arc: Arc>) -> Self { + Self(arc) + } /// Internal helper to push a frame with the requested priority. /// @@ -253,7 +255,9 @@ impl PushHandle { } /// Downgrade to a `Weak` reference for storage in a registry. - pub(crate) fn downgrade(&self) -> Weak> { Arc::downgrade(&self.0) } + pub(crate) fn downgrade(&self) -> Weak> { + Arc::downgrade(&self.0) + } } /// Receiver ends of the push queues stored by the connection actor. @@ -387,10 +391,10 @@ impl PushQueues { rate: Option, dlq: Option>, ) -> Result<(Self, PushHandle), PushConfigError> { - if let Some(r) = rate { - if r == 0 || r > MAX_PUSH_RATE { - return Err(PushConfigError::InvalidRate(r)); - } + if let Some(r) = rate.filter(|r| *r == 0 || *r > MAX_PUSH_RATE) { + // Reject unsupported rates early to avoid building queues that cannot + // be used. The bounds prevent runaway resource consumption. + return Err(PushConfigError::InvalidRate(r)); } let (high_tx, high_rx) = mpsc::channel(high_capacity); let (low_tx, low_rx) = mpsc::channel(low_capacity); diff --git a/src/server.rs b/src/server.rs index b34942d0..bb983b1f 100644 --- a/src/server.rs +++ b/src/server.rs @@ -229,7 +229,9 @@ where /// ``` #[inline] #[must_use] - pub const fn worker_count(&self) -> usize { self.workers } + pub const fn worker_count(&self) -> usize { + self.workers + } /// Get the socket address the server is bound to, if available. #[must_use] @@ -508,8 +510,13 @@ async fn process_stream( match read_preamble::<_, T>(&mut stream).await { Ok((preamble, leftover)) => { if let Some(handler) = on_success.as_ref() { - if let Err(e) = handler(&preamble, &mut stream).await { - tracing::error!(error = ?e, ?peer_addr, "preamble callback error"); + match handler(&preamble, &mut stream).await { + Ok(()) => {} + Err(e) => { + // Log and continue processing if the callback fails; connection + // handling should not halt due to diagnostic hooks. + tracing::error!(error = ?e, ?peer_addr, "preamble callback error"); + } } } let stream = RewindStream::new(leftover, stream); From 24f51370559f4c6620c800e7ecc04b2afda9b7b1 Mon Sep 17 00:00:00 2001 From: Leynos Date: Sun, 3 Aug 2025 16:06:41 +0100 Subject: [PATCH 07/10] Remove unneedeed carriage returns --- src/server.rs | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/src/server.rs b/src/server.rs index bb983b1f..f2c245df 100644 --- a/src/server.rs +++ b/src/server.rs @@ -229,9 +229,7 @@ where /// ``` #[inline] #[must_use] - pub const fn worker_count(&self) -> usize { - self.workers - } + pub const fn worker_count(&self) -> usize { self.workers } /// Get the socket address the server is bound to, if available. #[must_use] From ada295d5da7c9be792bd920a79c86a9c247eaca3 Mon Sep 17 00:00:00 2001 From: Leynos Date: Sun, 3 Aug 2025 16:08:00 +0100 Subject: [PATCH 08/10] Remove unneedeed carriage returns --- src/push.rs | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/src/push.rs b/src/push.rs index 6712da33..99c6289b 100644 --- a/src/push.rs +++ b/src/push.rs @@ -97,9 +97,7 @@ pub(crate) struct PushHandleInner { pub struct PushHandle(Arc>); impl PushHandle { - pub(crate) fn from_arc(arc: Arc>) -> Self { - Self(arc) - } + pub(crate) fn from_arc(arc: Arc>) -> Self { Self(arc) } /// Internal helper to push a frame with the requested priority. /// @@ -255,9 +253,7 @@ impl PushHandle { } /// Downgrade to a `Weak` reference for storage in a registry. - pub(crate) fn downgrade(&self) -> Weak> { - Arc::downgrade(&self.0) - } + pub(crate) fn downgrade(&self) -> Weak> { Arc::downgrade(&self.0) } } /// Receiver ends of the push queues stored by the connection actor. From f68d91c0c0e4c9a40a1015f592d5af23fd5d12ec Mon Sep 17 00:00:00 2001 From: Leynos Date: Sun, 3 Aug 2025 16:22:44 +0100 Subject: [PATCH 09/10] Remove unused struct --- examples/metadata_routing.rs | 4 ---- 1 file changed, 4 deletions(-) diff --git a/examples/metadata_routing.rs b/examples/metadata_routing.rs index 9d2e7ead..8fa7e5b1 100644 --- a/examples/metadata_routing.rs +++ b/examples/metadata_routing.rs @@ -59,10 +59,6 @@ impl FrameMetadata for HeaderSerializer { #[derive(bincode::Decode, bincode::Encode)] struct Ping; -#[derive(bincode::Decode, bincode::Encode)] -// Placeholder for demonstration of metadata routing; not used directly. -struct Pong; - #[tokio::main] async fn main() -> io::Result<()> { let app = WireframeApp::new() From 9355c363f77fc8ddf6e874753bc062f9b0b686be Mon Sep 17 00:00:00 2001 From: Leynos Date: Sun, 3 Aug 2025 16:23:45 +0100 Subject: [PATCH 10/10] Remove unused struct --- src/server.rs | 5 ----- 1 file changed, 5 deletions(-) diff --git a/src/server.rs b/src/server.rs index f2c245df..e5b1d4b8 100644 --- a/src/server.rs +++ b/src/server.rs @@ -561,11 +561,6 @@ mod tests { message: String, } - /// Test helper preamble carrying no data. - #[derive(Debug, Clone, PartialEq, Encode, Decode)] - // Used only in doctest to illustrate an empty preamble. - struct EmptyPreamble; - #[fixture] fn factory() -> impl Fn() -> WireframeApp + Send + Sync + Clone + 'static { || WireframeApp::default()