From a4c1314a25ff1735380d198b9f5d1b58997fbc10 Mon Sep 17 00:00:00 2001 From: Leynos Date: Sun, 3 Aug 2025 20:19:13 +0100 Subject: [PATCH 1/5] Handle malformed API responses Gracefully accept empty responses from the GitHub API and enforce a minimum cooldown to avoid busy retries. --- crates/comenqd/src/daemon.rs | 9 +++++---- docs/comenq-design.md | 34 ++++++++++++++++++++++++++-------- 2 files changed, 31 insertions(+), 12 deletions(-) diff --git a/crates/comenqd/src/daemon.rs b/crates/comenqd/src/daemon.rs index 4df9ac4..5c665f5 100644 --- a/crates/comenqd/src/daemon.rs +++ b/crates/comenqd/src/daemon.rs @@ -251,7 +251,7 @@ pub async fn run_worker( let request: CommentRequest = serde_json::from_slice(&guard)?; match post_comment(&octocrab, &request).await { - Ok(_) => { + Ok(_) | Err(PostCommentError::Api(octocrab::Error::Serde { .. })) => { guard.commit()?; } Err(PostCommentError::Api(e)) => { @@ -260,7 +260,7 @@ pub async fn run_worker( owner = %request.owner, repo = %request.repo, pr = request.pr_number, - "GitHub API call failed" + "GitHub API call failed", ); } Err(PostCommentError::Timeout) => { @@ -268,12 +268,13 @@ pub async fn run_worker( owner = %request.owner, repo = %request.repo, pr = request.pr_number, - "GitHub API call timed out" + "GitHub API call timed out", ); } } - tokio::time::sleep(Duration::from_secs(config.cooldown_period_seconds)).await; + let sleep_secs = std::cmp::max(1, config.cooldown_period_seconds); + tokio::time::sleep(Duration::from_secs(sleep_secs)).await; } } diff --git a/docs/comenq-design.md b/docs/comenq-design.md index 452834c..7e845b8 100644 --- a/docs/comenq-design.md +++ b/docs/comenq-design.md @@ -1000,21 +1000,36 @@ async fn run_worker(config: Arc, mut rx: Receiver, octoc loop { let guard = rx.recv().await?; let request = &*guard; - info!("Processing comment for PR #{}: {}/{}", request.pr_number, request.owner, request.repo); - - match octocrab.issues(&request.owner, &request.repo).create_comment(request.pr_number, &request.body).await { - Ok(comment) => { - info!("Successfully posted comment: {}", comment.html_url); + info!( + "Processing comment for PR #{}: {}/{}", + request.pr_number, + request.owner, + request.repo + ); + + match octocrab + .issues(&request.owner, &request.repo) + .create_comment(request.pr_number, &request.body) + .await + { + Ok(_) | Err(octocrab::Error::Serde { .. }) => { guard.commit()?; } Err(e) => { - error!("Failed to post comment for PR #{}: {}. Requeuing.", request.pr_number, e); + error!( + "Failed to post comment for PR #{}: {}. Requeuing.", + request.pr_number, + e + ); // Guard is dropped, job is automatically requeued. } } info!("Entering {}s cooldown period.", config.cooldown_period_seconds); - tokio::time::sleep(Duration::from_secs(config.cooldown_period_seconds)).await; + tokio::time::sleep( + Duration::from_secs(std::cmp::max(1, config.cooldown_period_seconds)) + ) + .await; } } ``` @@ -1034,7 +1049,10 @@ single-writer semantics without per-connection locking. The worker's cooling-off period is configured via `cooldown_period_seconds` and defaults to 960 seconds (16 minutes) to provide ample headroom against GitHub's -secondary rate limits. +secondary rate limits. A minimum delay of one second is enforced even if a +lower value is configured to prevent a busy retry loop. Responses that cannot +be deserialised are treated as successful because the daemon discards the +response body. GitHub API calls are wrapped in `tokio::time::timeout` with a 10-second limit to ensure the worker does not block indefinitely if the network stalls. From 3107d913b0d43a7cddf4e9753bb41302480b391d Mon Sep 17 00:00:00 2001 From: Leynos Date: Sun, 3 Aug 2025 21:11:40 +0100 Subject: [PATCH 2/5] Keep worker test queue directory --- crates/comenqd/src/daemon.rs | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/crates/comenqd/src/daemon.rs b/crates/comenqd/src/daemon.rs index 5c665f5..aa41d7b 100644 --- a/crates/comenqd/src/daemon.rs +++ b/crates/comenqd/src/daemon.rs @@ -332,7 +332,9 @@ mod tests { path.exists() } - async fn setup_run_worker(status: u16) -> (MockServer, Arc, Receiver, Arc) { + async fn setup_run_worker( + status: u16, + ) -> (TempDir, MockServer, Arc, Receiver, Arc) { let dir = tempdir().expect("tempdir"); let cfg = Arc::new(Config { cooldown_period_seconds: 0, @@ -358,7 +360,7 @@ mod tests { .await; let octo = octocrab_for(&server); - (server, cfg, rx, octo) + (dir, server, cfg, rx, octo) } #[tokio::test] @@ -452,7 +454,7 @@ mod tests { #[tokio::test] async fn run_worker_commits_on_success() { - let (server, cfg, rx, octo) = setup_run_worker(201).await; + let (_dir, server, cfg, rx, octo) = setup_run_worker(201).await; let h = tokio::spawn(run_worker(cfg.clone(), rx, octo)); sleep(Duration::from_millis(50)).await; h.abort(); @@ -462,7 +464,7 @@ mod tests { #[tokio::test] async fn run_worker_requeues_on_error() { - let (server, cfg, rx, octo) = setup_run_worker(500).await; + let (_dir, server, cfg, rx, octo) = setup_run_worker(500).await; let h = tokio::spawn(run_worker(cfg.clone(), rx, octo)); sleep(Duration::from_millis(50)).await; h.abort(); From 09acb4576e1ede61d9f62caffec55a7e3903be0a Mon Sep 17 00:00:00 2001 From: Leynos Date: Sun, 3 Aug 2025 23:50:23 +0100 Subject: [PATCH 3/5] Extend worker test waits --- crates/comenqd/src/daemon.rs | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/crates/comenqd/src/daemon.rs b/crates/comenqd/src/daemon.rs index aa41d7b..07fed1c 100644 --- a/crates/comenqd/src/daemon.rs +++ b/crates/comenqd/src/daemon.rs @@ -456,7 +456,10 @@ mod tests { async fn run_worker_commits_on_success() { let (_dir, server, cfg, rx, octo) = setup_run_worker(201).await; let h = tokio::spawn(run_worker(cfg.clone(), rx, octo)); - sleep(Duration::from_millis(50)).await; + // The worker enforces a minimum one second cooldown between attempts. + // Allow slightly longer than that so the job is processed before we + // abort the task. + sleep(Duration::from_millis(1100)).await; h.abort(); assert_eq!(server.received_requests().await.unwrap().len(), 1); assert_eq!(std::fs::read_dir(&cfg.queue_path).unwrap().count(), 0); @@ -466,7 +469,9 @@ mod tests { async fn run_worker_requeues_on_error() { let (_dir, server, cfg, rx, octo) = setup_run_worker(500).await; let h = tokio::spawn(run_worker(cfg.clone(), rx, octo)); - sleep(Duration::from_millis(50)).await; + // Allow the worker enough time to handle the job and wait for the + // mandated cooldown period. + sleep(Duration::from_millis(1100)).await; h.abort(); assert_eq!(server.received_requests().await.unwrap().len(), 1); assert!(std::fs::read_dir(&cfg.queue_path).unwrap().count() > 0); From 7de056e87a8cd191b31b49ffc7925da856ca9e7d Mon Sep 17 00:00:00 2001 From: Leynos Date: Mon, 4 Aug 2025 01:34:46 +0100 Subject: [PATCH 4/5] Document worker handling of malformed API responses --- crates/comenqd/src/daemon.rs | 27 ++++++++++++++++++++++++++- docs/comenq-design.md | 2 +- 2 files changed, 27 insertions(+), 2 deletions(-) diff --git a/crates/comenqd/src/daemon.rs b/crates/comenqd/src/daemon.rs index 07fed1c..8040981 100644 --- a/crates/comenqd/src/daemon.rs +++ b/crates/comenqd/src/daemon.rs @@ -251,6 +251,12 @@ pub async fn run_worker( let request: CommentRequest = serde_json::from_slice(&guard)?; match post_comment(&octocrab, &request).await { + // GitHub occasionally returns malformed JSON even when the comment + // is posted successfully. Retrying would loop indefinitely because + // the response body is ignored. Commit the job and drop the + // response; see + // docs/comenq-design.md#the-github-comment-posting-worker-task_process_queue + // for details. Ok(_) | Err(PostCommentError::Api(octocrab::Error::Serde { .. })) => { guard.commit()?; } @@ -334,6 +340,13 @@ mod tests { async fn setup_run_worker( status: u16, + ) -> (TempDir, MockServer, Arc, Receiver, Arc) { + setup_run_worker_with_body(status, "{}").await + } + + async fn setup_run_worker_with_body( + status: u16, + body: &str, ) -> (TempDir, MockServer, Arc, Receiver, Arc) { let dir = tempdir().expect("tempdir"); let cfg = Arc::new(Config { @@ -355,7 +368,7 @@ mod tests { let server = MockServer::start().await; Mock::given(method("POST")) .and(path("/repos/o/r/issues/1/comments")) - .respond_with(ResponseTemplate::new(status).set_body_raw("{}", "application/json")) + .respond_with(ResponseTemplate::new(status).set_body_raw(body, "application/json")) .mount(&server) .await; @@ -476,4 +489,16 @@ mod tests { assert_eq!(server.received_requests().await.unwrap().len(), 1); assert!(std::fs::read_dir(&cfg.queue_path).unwrap().count() > 0); } + + #[tokio::test] + async fn run_worker_drops_on_serde_error() { + let (_dir, server, cfg, rx, octo) = setup_run_worker_with_body(201, "not json").await; + let h = tokio::spawn(run_worker(cfg.clone(), rx, octo)); + // Allow the worker enough time to handle the job and wait for the + // mandated cooldown period. + sleep(Duration::from_millis(1100)).await; + h.abort(); + assert_eq!(server.received_requests().await.unwrap().len(), 1); + assert_eq!(std::fs::read_dir(&cfg.queue_path).unwrap().count(), 0); + } } diff --git a/docs/comenq-design.md b/docs/comenq-design.md index 7e845b8..5af39d1 100644 --- a/docs/comenq-design.md +++ b/docs/comenq-design.md @@ -1052,7 +1052,7 @@ defaults to 960 seconds (16 minutes) to provide ample headroom against GitHub's secondary rate limits. A minimum delay of one second is enforced even if a lower value is configured to prevent a busy retry loop. Responses that cannot be deserialised are treated as successful because the daemon discards the -response body. +response body; retrying malformed responses would loop indefinitely. GitHub API calls are wrapped in `tokio::time::timeout` with a 10-second limit to ensure the worker does not block indefinitely if the network stalls. From 565eb66248fdfe597515d27c6b75e8154399cdbd Mon Sep 17 00:00:00 2001 From: Leynos Date: Mon, 4 Aug 2025 17:29:39 +0100 Subject: [PATCH 5/5] Clear worker test queue between runs --- crates/comenqd/src/daemon.rs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/crates/comenqd/src/daemon.rs b/crates/comenqd/src/daemon.rs index 8040981..cffdf8d 100644 --- a/crates/comenqd/src/daemon.rs +++ b/crates/comenqd/src/daemon.rs @@ -353,6 +353,9 @@ mod tests { cooldown_period_seconds: 0, ..temp_config(&dir) }); + // Clear any existing queue files to ensure clean test isolation + std::fs::remove_dir_all(&cfg.queue_path).ok(); + std::fs::create_dir_all(&cfg.queue_path).expect("create queue dir"); let (mut sender, rx) = channel(&cfg.queue_path).expect("channel"); let req = CommentRequest { owner: "o".into(),