diff --git a/crates/comenqd/src/daemon.rs b/crates/comenqd/src/daemon.rs index 434cd24..bcdb37b 100644 --- a/crates/comenqd/src/daemon.rs +++ b/crates/comenqd/src/daemon.rs @@ -24,9 +24,11 @@ pub use crate::util::is_metadata_file; /// Listener utilities for accepting client connections. /// -/// Re-exports functions from the internal `listener` module so integration tests -/// can exercise socket preparation and client handling without exposing the -/// entire module as part of the public API. +/// Re-exports selected listener APIs (functions and constants) so integration +/// tests can exercise socket preparation, client handling, and read limits +/// without exposing the entire `listener` module publicly. pub mod listener { - pub use crate::listener::{handle_client, prepare_listener, run_listener}; + pub use crate::listener::{ + CLIENT_READ_TIMEOUT_SECS, MAX_REQUEST_BYTES, handle_client, prepare_listener, run_listener, + }; } diff --git a/crates/comenqd/tests/daemon.rs b/crates/comenqd/tests/daemon.rs index 77057d1..7272c7f 100644 --- a/crates/comenqd/tests/daemon.rs +++ b/crates/comenqd/tests/daemon.rs @@ -283,27 +283,26 @@ mod worker_tests { let ctx = ctx.await; let server = Arc::new(ctx.server); let drained = Arc::new(Notify::new()); - let mut drained_notified = drained.notified(); let (shutdown_tx, shutdown_rx) = watch::channel(()); let control = WorkerControl { shutdown: shutdown_rx, hooks: WorkerHooks { enqueued: None, idle: None, - drained: Some(drained), + drained: Some(drained.clone()), }, }; let h = tokio::spawn(run_worker(ctx.cfg.clone(), ctx.rx, ctx.octo, control)); - if let Err(e) = timeout_with_retries( - DRAINED_NOTIFICATION, - "worker drained notification", - || async { - (&mut drained_notified).await; - Ok(()) - }, - ) - .await + if let Err(e) = + timeout_with_retries(DRAINED_NOTIFICATION, "worker drained notification", || { + let drained = drained.clone(); + async move { + drained.notified().await; + Ok(()) + } + }) + .await { let diagnostics = diagnose_queue_state(&ctx.cfg, &server, 0).await; tracing::error!("Timeout waiting for worker drained notification: {e}"); @@ -354,20 +353,22 @@ mod worker_tests { let server = Arc::new(ctx.server); let (shutdown_tx, shutdown_rx) = watch::channel(()); let enqueued = Arc::new(Notify::new()); - let mut enqueued_notified = enqueued.notified(); let control = WorkerControl { shutdown: shutdown_rx, hooks: WorkerHooks { - enqueued: Some(enqueued), + enqueued: Some(enqueued.clone()), idle: None, drained: None, }, }; let h = tokio::spawn(run_worker(ctx.cfg.clone(), ctx.rx, ctx.octo, control)); - timeout_with_retries(WORKER_SUCCESS, "worker enqueued", || async { - (&mut enqueued_notified).await; - Ok(()) + timeout_with_retries(WORKER_SUCCESS, "worker enqueued", || { + let enqueued = enqueued.clone(); + async move { + enqueued.notified().await; + Ok(()) + } }) .await .expect("worker picked up job"); diff --git a/crates/comenqd/tests/util.rs b/crates/comenqd/tests/util.rs index bca4d96..30c084f 100644 --- a/crates/comenqd/tests/util.rs +++ b/crates/comenqd/tests/util.rs @@ -11,15 +11,14 @@ pub const COVERAGE_MULTIPLIER: u64 = 5; pub const CI_MULTIPLIER: u64 = 2; pub const PROGRESSIVE_RETRY_PERCENTS: [u64; 3] = [50, 100, 150]; -#[derive(Debug, Clone, Copy)] +#[derive(Debug, Clone)] pub enum TestComplexity { - #[expect(dead_code, reason = "Constructed in integration tests but unused here")] Simple, Moderate, Complex, } -#[derive(Debug, Clone, Copy)] +#[derive(Debug, Clone)] pub struct TimeoutConfig { base_seconds: u64, complexity: TestComplexity, @@ -92,6 +91,14 @@ where Err(format!("{operation_name} exhausted all retry attempts")) } +#[rstest] +#[case(TestComplexity::Simple)] +#[case(TestComplexity::Moderate)] +#[case(TestComplexity::Complex)] +fn uses_all_test_complexity_variants(#[case] complexity: TestComplexity) { + drop(TimeoutConfig::new(1, complexity)); +} + /// Map a task [`JoinError`] into a concise diagnostic message. /// /// ```ignore @@ -110,7 +117,7 @@ pub(crate) fn join_err(name: &str, e: JoinError) -> String { } } -#[derive(Debug, Clone, Copy)] +#[derive(Debug, Clone)] enum JoinScenario { Cancelled, Panicked,