diff --git a/crates/comenqd/src/daemon.rs b/crates/comenqd/src/daemon.rs index 4df9ac4..cffdf8d 100644 --- a/crates/comenqd/src/daemon.rs +++ b/crates/comenqd/src/daemon.rs @@ -251,7 +251,13 @@ pub async fn run_worker( let request: CommentRequest = serde_json::from_slice(&guard)?; match post_comment(&octocrab, &request).await { - Ok(_) => { + // GitHub occasionally returns malformed JSON even when the comment + // is posted successfully. Retrying would loop indefinitely because + // the response body is ignored. Commit the job and drop the + // response; see + // docs/comenq-design.md#the-github-comment-posting-worker-task_process_queue + // for details. + Ok(_) | Err(PostCommentError::Api(octocrab::Error::Serde { .. })) => { guard.commit()?; } Err(PostCommentError::Api(e)) => { @@ -260,7 +266,7 @@ pub async fn run_worker( owner = %request.owner, repo = %request.repo, pr = request.pr_number, - "GitHub API call failed" + "GitHub API call failed", ); } Err(PostCommentError::Timeout) => { @@ -268,12 +274,13 @@ pub async fn run_worker( owner = %request.owner, repo = %request.repo, pr = request.pr_number, - "GitHub API call timed out" + "GitHub API call timed out", ); } } - tokio::time::sleep(Duration::from_secs(config.cooldown_period_seconds)).await; + let sleep_secs = std::cmp::max(1, config.cooldown_period_seconds); + tokio::time::sleep(Duration::from_secs(sleep_secs)).await; } } @@ -331,12 +338,24 @@ mod tests { path.exists() } - async fn setup_run_worker(status: u16) -> (MockServer, Arc, Receiver, Arc) { + async fn setup_run_worker( + status: u16, + ) -> (TempDir, MockServer, Arc, Receiver, Arc) { + setup_run_worker_with_body(status, "{}").await + } + + async fn setup_run_worker_with_body( + status: u16, + body: &str, + ) -> (TempDir, MockServer, Arc, Receiver, Arc) { let dir = tempdir().expect("tempdir"); let cfg = Arc::new(Config { cooldown_period_seconds: 0, ..temp_config(&dir) }); + // Clear any existing queue files to ensure clean test isolation + std::fs::remove_dir_all(&cfg.queue_path).ok(); + std::fs::create_dir_all(&cfg.queue_path).expect("create queue dir"); let (mut sender, rx) = channel(&cfg.queue_path).expect("channel"); let req = CommentRequest { owner: "o".into(), @@ -352,12 +371,12 @@ mod tests { let server = MockServer::start().await; Mock::given(method("POST")) .and(path("/repos/o/r/issues/1/comments")) - .respond_with(ResponseTemplate::new(status).set_body_raw("{}", "application/json")) + .respond_with(ResponseTemplate::new(status).set_body_raw(body, "application/json")) .mount(&server) .await; let octo = octocrab_for(&server); - (server, cfg, rx, octo) + (dir, server, cfg, rx, octo) } #[tokio::test] @@ -451,9 +470,12 @@ mod tests { #[tokio::test] async fn run_worker_commits_on_success() { - let (server, cfg, rx, octo) = setup_run_worker(201).await; + let (_dir, server, cfg, rx, octo) = setup_run_worker(201).await; let h = tokio::spawn(run_worker(cfg.clone(), rx, octo)); - sleep(Duration::from_millis(50)).await; + // The worker enforces a minimum one second cooldown between attempts. + // Allow slightly longer than that so the job is processed before we + // abort the task. + sleep(Duration::from_millis(1100)).await; h.abort(); assert_eq!(server.received_requests().await.unwrap().len(), 1); assert_eq!(std::fs::read_dir(&cfg.queue_path).unwrap().count(), 0); @@ -461,11 +483,25 @@ mod tests { #[tokio::test] async fn run_worker_requeues_on_error() { - let (server, cfg, rx, octo) = setup_run_worker(500).await; + let (_dir, server, cfg, rx, octo) = setup_run_worker(500).await; let h = tokio::spawn(run_worker(cfg.clone(), rx, octo)); - sleep(Duration::from_millis(50)).await; + // Allow the worker enough time to handle the job and wait for the + // mandated cooldown period. + sleep(Duration::from_millis(1100)).await; h.abort(); assert_eq!(server.received_requests().await.unwrap().len(), 1); assert!(std::fs::read_dir(&cfg.queue_path).unwrap().count() > 0); } + + #[tokio::test] + async fn run_worker_drops_on_serde_error() { + let (_dir, server, cfg, rx, octo) = setup_run_worker_with_body(201, "not json").await; + let h = tokio::spawn(run_worker(cfg.clone(), rx, octo)); + // Allow the worker enough time to handle the job and wait for the + // mandated cooldown period. + sleep(Duration::from_millis(1100)).await; + h.abort(); + assert_eq!(server.received_requests().await.unwrap().len(), 1); + assert_eq!(std::fs::read_dir(&cfg.queue_path).unwrap().count(), 0); + } } diff --git a/docs/comenq-design.md b/docs/comenq-design.md index 452834c..5af39d1 100644 --- a/docs/comenq-design.md +++ b/docs/comenq-design.md @@ -1000,21 +1000,36 @@ async fn run_worker(config: Arc, mut rx: Receiver, octoc loop { let guard = rx.recv().await?; let request = &*guard; - info!("Processing comment for PR #{}: {}/{}", request.pr_number, request.owner, request.repo); - - match octocrab.issues(&request.owner, &request.repo).create_comment(request.pr_number, &request.body).await { - Ok(comment) => { - info!("Successfully posted comment: {}", comment.html_url); + info!( + "Processing comment for PR #{}: {}/{}", + request.pr_number, + request.owner, + request.repo + ); + + match octocrab + .issues(&request.owner, &request.repo) + .create_comment(request.pr_number, &request.body) + .await + { + Ok(_) | Err(octocrab::Error::Serde { .. }) => { guard.commit()?; } Err(e) => { - error!("Failed to post comment for PR #{}: {}. Requeuing.", request.pr_number, e); + error!( + "Failed to post comment for PR #{}: {}. Requeuing.", + request.pr_number, + e + ); // Guard is dropped, job is automatically requeued. } } info!("Entering {}s cooldown period.", config.cooldown_period_seconds); - tokio::time::sleep(Duration::from_secs(config.cooldown_period_seconds)).await; + tokio::time::sleep( + Duration::from_secs(std::cmp::max(1, config.cooldown_period_seconds)) + ) + .await; } } ``` @@ -1034,7 +1049,10 @@ single-writer semantics without per-connection locking. The worker's cooling-off period is configured via `cooldown_period_seconds` and defaults to 960 seconds (16 minutes) to provide ample headroom against GitHub's -secondary rate limits. +secondary rate limits. A minimum delay of one second is enforced even if a +lower value is configured to prevent a busy retry loop. Responses that cannot +be deserialised are treated as successful because the daemon discards the +response body; retrying malformed responses would loop indefinitely. GitHub API calls are wrapped in `tokio::time::timeout` with a 10-second limit to ensure the worker does not block indefinitely if the network stalls.