Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 47 additions & 11 deletions crates/comenqd/src/daemon.rs
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,13 @@ pub async fn run_worker(
let request: CommentRequest = serde_json::from_slice(&guard)?;

match post_comment(&octocrab, &request).await {
Ok(_) => {
// GitHub occasionally returns malformed JSON even when the comment
// is posted successfully. Retrying would loop indefinitely because
// the response body is ignored. Commit the job and drop the
// response; see
// docs/comenq-design.md#the-github-comment-posting-worker-task_process_queue
// for details.
Ok(_) | Err(PostCommentError::Api(octocrab::Error::Serde { .. })) => {
guard.commit()?;
}
Comment thread
coderabbitai[bot] marked this conversation as resolved.
Err(PostCommentError::Api(e)) => {
Expand All @@ -260,20 +266,21 @@ pub async fn run_worker(
owner = %request.owner,
repo = %request.repo,
pr = request.pr_number,
"GitHub API call failed"
"GitHub API call failed",
);
}
Err(PostCommentError::Timeout) => {
tracing::error!(
owner = %request.owner,
repo = %request.repo,
pr = request.pr_number,
"GitHub API call timed out"
"GitHub API call timed out",
);
}
}

tokio::time::sleep(Duration::from_secs(config.cooldown_period_seconds)).await;
let sleep_secs = std::cmp::max(1, config.cooldown_period_seconds);
tokio::time::sleep(Duration::from_secs(sleep_secs)).await;
}
}

Expand Down Expand Up @@ -331,12 +338,24 @@ mod tests {
path.exists()
}

async fn setup_run_worker(status: u16) -> (MockServer, Arc<Config>, Receiver, Arc<Octocrab>) {
async fn setup_run_worker(
status: u16,
) -> (TempDir, MockServer, Arc<Config>, Receiver, Arc<Octocrab>) {
setup_run_worker_with_body(status, "{}").await
}

async fn setup_run_worker_with_body(
status: u16,
body: &str,
) -> (TempDir, MockServer, Arc<Config>, Receiver, Arc<Octocrab>) {
let dir = tempdir().expect("tempdir");
let cfg = Arc::new(Config {
cooldown_period_seconds: 0,
..temp_config(&dir)
});
// Clear any existing queue files to ensure clean test isolation
std::fs::remove_dir_all(&cfg.queue_path).ok();
std::fs::create_dir_all(&cfg.queue_path).expect("create queue dir");
let (mut sender, rx) = channel(&cfg.queue_path).expect("channel");
let req = CommentRequest {
owner: "o".into(),
Expand All @@ -352,12 +371,12 @@ mod tests {
let server = MockServer::start().await;
Mock::given(method("POST"))
.and(path("/repos/o/r/issues/1/comments"))
.respond_with(ResponseTemplate::new(status).set_body_raw("{}", "application/json"))
.respond_with(ResponseTemplate::new(status).set_body_raw(body, "application/json"))
.mount(&server)
.await;

let octo = octocrab_for(&server);
(server, cfg, rx, octo)
(dir, server, cfg, rx, octo)
}

#[tokio::test]
Expand Down Expand Up @@ -451,21 +470,38 @@ mod tests {

#[tokio::test]
async fn run_worker_commits_on_success() {
let (server, cfg, rx, octo) = setup_run_worker(201).await;
let (_dir, server, cfg, rx, octo) = setup_run_worker(201).await;
let h = tokio::spawn(run_worker(cfg.clone(), rx, octo));
sleep(Duration::from_millis(50)).await;
// The worker enforces a minimum one second cooldown between attempts.
// Allow slightly longer than that so the job is processed before we
// abort the task.
sleep(Duration::from_millis(1100)).await;
h.abort();
assert_eq!(server.received_requests().await.unwrap().len(), 1);
assert_eq!(std::fs::read_dir(&cfg.queue_path).unwrap().count(), 0);
}

#[tokio::test]
async fn run_worker_requeues_on_error() {
let (server, cfg, rx, octo) = setup_run_worker(500).await;
let (_dir, server, cfg, rx, octo) = setup_run_worker(500).await;
let h = tokio::spawn(run_worker(cfg.clone(), rx, octo));
sleep(Duration::from_millis(50)).await;
// Allow the worker enough time to handle the job and wait for the
// mandated cooldown period.
sleep(Duration::from_millis(1100)).await;
h.abort();
assert_eq!(server.received_requests().await.unwrap().len(), 1);
assert!(std::fs::read_dir(&cfg.queue_path).unwrap().count() > 0);
}

#[tokio::test]
async fn run_worker_drops_on_serde_error() {
let (_dir, server, cfg, rx, octo) = setup_run_worker_with_body(201, "not json").await;
let h = tokio::spawn(run_worker(cfg.clone(), rx, octo));
// Allow the worker enough time to handle the job and wait for the
// mandated cooldown period.
sleep(Duration::from_millis(1100)).await;
h.abort();
assert_eq!(server.received_requests().await.unwrap().len(), 1);
assert_eq!(std::fs::read_dir(&cfg.queue_path).unwrap().count(), 0);
}
}
34 changes: 26 additions & 8 deletions docs/comenq-design.md
Original file line number Diff line number Diff line change
Expand Up @@ -1000,21 +1000,36 @@ async fn run_worker(config: Arc<Config>, mut rx: Receiver<CommentRequest>, octoc
loop {
let guard = rx.recv().await?;
let request = &*guard;
info!("Processing comment for PR #{}: {}/{}", request.pr_number, request.owner, request.repo);

match octocrab.issues(&request.owner, &request.repo).create_comment(request.pr_number, &request.body).await {
Ok(comment) => {
info!("Successfully posted comment: {}", comment.html_url);
info!(
"Processing comment for PR #{}: {}/{}",
request.pr_number,
request.owner,
request.repo
);

match octocrab
.issues(&request.owner, &request.repo)
.create_comment(request.pr_number, &request.body)
.await
{
Ok(_) | Err(octocrab::Error::Serde { .. }) => {
guard.commit()?;
}
Err(e) => {
error!("Failed to post comment for PR #{}: {}. Requeuing.", request.pr_number, e);
error!(
"Failed to post comment for PR #{}: {}. Requeuing.",
request.pr_number,
e
);
// Guard is dropped, job is automatically requeued.
}
}

info!("Entering {}s cooldown period.", config.cooldown_period_seconds);
tokio::time::sleep(Duration::from_secs(config.cooldown_period_seconds)).await;
tokio::time::sleep(
Duration::from_secs(std::cmp::max(1, config.cooldown_period_seconds))
)
.await;
}
}
```
Expand All @@ -1034,7 +1049,10 @@ single-writer semantics without per-connection locking.

The worker's cooling-off period is configured via `cooldown_period_seconds` and
defaults to 960 seconds (16 minutes) to provide ample headroom against GitHub's
secondary rate limits.
secondary rate limits. A minimum delay of one second is enforced even if a
lower value is configured to prevent a busy retry loop. Responses that cannot
Comment thread
leynos marked this conversation as resolved.
be deserialised are treated as successful because the daemon discards the
response body; retrying malformed responses would loop indefinitely.

GitHub API calls are wrapped in `tokio::time::timeout` with a 10-second limit
to ensure the worker does not block indefinitely if the network stalls.
Expand Down
Loading