Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 6 additions & 4 deletions crates/comenqd/src/daemon.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,11 @@ pub use crate::util::is_metadata_file;

/// Listener utilities for accepting client connections.
///
/// Re-exports functions from the internal `listener` module so integration tests
/// can exercise socket preparation and client handling without exposing the
/// entire module as part of the public API.
/// Re-exports selected listener APIs (functions and constants) so integration
/// tests can exercise socket preparation, client handling, and read limits
/// without exposing the entire `listener` module publicly.
pub mod listener {
pub use crate::listener::{handle_client, prepare_listener, run_listener};
pub use crate::listener::{
CLIENT_READ_TIMEOUT_SECS, MAX_REQUEST_BYTES, handle_client, prepare_listener, run_listener,
};
Comment thread
coderabbitai[bot] marked this conversation as resolved.
}
33 changes: 17 additions & 16 deletions crates/comenqd/tests/daemon.rs
Original file line number Diff line number Diff line change
Expand Up @@ -283,27 +283,26 @@ mod worker_tests {
let ctx = ctx.await;
let server = Arc::new(ctx.server);
let drained = Arc::new(Notify::new());
let mut drained_notified = drained.notified();
let (shutdown_tx, shutdown_rx) = watch::channel(());
let control = WorkerControl {
shutdown: shutdown_rx,
hooks: WorkerHooks {
enqueued: None,
idle: None,
drained: Some(drained),
drained: Some(drained.clone()),
},
};
let h = tokio::spawn(run_worker(ctx.cfg.clone(), ctx.rx, ctx.octo, control));

if let Err(e) = timeout_with_retries(
DRAINED_NOTIFICATION,
"worker drained notification",
|| async {
(&mut drained_notified).await;
Ok(())
},
)
.await
if let Err(e) =
timeout_with_retries(DRAINED_NOTIFICATION, "worker drained notification", || {
let drained = drained.clone();
async move {
drained.notified().await;
Ok(())
}
})
.await
{
let diagnostics = diagnose_queue_state(&ctx.cfg, &server, 0).await;
tracing::error!("Timeout waiting for worker drained notification: {e}");
Expand Down Expand Up @@ -354,20 +353,22 @@ mod worker_tests {
let server = Arc::new(ctx.server);
let (shutdown_tx, shutdown_rx) = watch::channel(());
let enqueued = Arc::new(Notify::new());
let mut enqueued_notified = enqueued.notified();
let control = WorkerControl {
shutdown: shutdown_rx,
hooks: WorkerHooks {
enqueued: Some(enqueued),
enqueued: Some(enqueued.clone()),
idle: None,
drained: None,
},
};
let h = tokio::spawn(run_worker(ctx.cfg.clone(), ctx.rx, ctx.octo, control));

timeout_with_retries(WORKER_SUCCESS, "worker enqueued", || async {
(&mut enqueued_notified).await;
Ok(())
timeout_with_retries(WORKER_SUCCESS, "worker enqueued", || {
let enqueued = enqueued.clone();
async move {
enqueued.notified().await;
Ok(())
}
})
.await
.expect("worker picked up job");
Expand Down
15 changes: 11 additions & 4 deletions crates/comenqd/tests/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,14 @@ pub const COVERAGE_MULTIPLIER: u64 = 5;
pub const CI_MULTIPLIER: u64 = 2;
pub const PROGRESSIVE_RETRY_PERCENTS: [u64; 3] = [50, 100, 150];

#[derive(Debug, Clone, Copy)]
#[derive(Debug, Clone)]
pub enum TestComplexity {
#[expect(dead_code, reason = "Constructed in integration tests but unused here")]
Simple,
Moderate,
Complex,
}

#[derive(Debug, Clone, Copy)]
#[derive(Debug, Clone)]
pub struct TimeoutConfig {
base_seconds: u64,
complexity: TestComplexity,
Expand Down Expand Up @@ -92,6 +91,14 @@ where
Err(format!("{operation_name} exhausted all retry attempts"))
}

#[rstest]
#[case(TestComplexity::Simple)]
#[case(TestComplexity::Moderate)]
#[case(TestComplexity::Complex)]
fn uses_all_test_complexity_variants(#[case] complexity: TestComplexity) {
drop(TimeoutConfig::new(1, complexity));
}

/// Map a task [`JoinError`] into a concise diagnostic message.
///
/// ```ignore
Expand All @@ -110,7 +117,7 @@ pub(crate) fn join_err(name: &str, e: JoinError) -> String {
}
}

#[derive(Debug, Clone, Copy)]
#[derive(Debug, Clone)]
enum JoinScenario {
Cancelled,
Panicked,
Expand Down
Loading