Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 35 additions & 19 deletions crates/comenqd/src/daemon.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,15 @@ enum PostCommentError {
Timeout,
}

/// Constructs an authenticated Octocrab GitHub client using a personal access token.
///
/// # Arguments
///
/// * `token` - A GitHub personal access token used for authentication.
///
/// # Returns
///
/// Returns an `Octocrab` client instance on success, or an error if the client could not be built.
fn build_octocrab(token: &str) -> Result<Octocrab> {
Ok(Octocrab::builder()
.personal_token(token.to_string())
Expand All @@ -46,12 +55,17 @@ fn prepare_listener(path: &Path) -> Result<UnixListener> {
Ok(listener)
}

/// Asynchronously creates the queue directory and all necessary parent directories if they do not exist.
async fn ensure_queue_dir(path: &Path) -> Result<()> {
fs::create_dir_all(path).await?;
Ok(())
}

/// Post a comment to GitHub with a 10 second timeout.
/// Attempts to post a comment to a GitHub pull request, enforcing a 10-second timeout.
///
/// Returns `Ok(())` if the comment is successfully posted. If the GitHub API returns an error,
/// returns `PostCommentError::Api`. If the operation does not complete within 10 seconds,
/// returns `PostCommentError::Timeout`.
async fn post_comment(
octocrab: &Octocrab,
request: &CommentRequest,
Expand Down Expand Up @@ -83,7 +97,7 @@ async fn post_comment(
/// - or if the sender fails while awaiting shutdown.
///
/// # Examples
/// ```no_run
/// ```rust,no_run
/// use yaque::channel;
/// use tokio::sync::mpsc;
/// # async fn docs() -> anyhow::Result<()> {
Expand Down Expand Up @@ -208,27 +222,29 @@ async fn handle_client(mut stream: UnixStream, tx: mpsc::UnboundedSender<Vec<u8>
Ok(())
}

/// Dequeue requests and post comments to GitHub with a cooldown.
/// Processes queued comment requests and posts them to GitHub, enforcing a cooldown between attempts.
///
/// The worker continuously reads entries from the `yaque` queue and posts each
/// comment through the provided [`Octocrab`] instance. Successful posts commit
/// the queue entry, removing it from disk. Failures leave the message
/// uncommitted so it is retried on the next loop iteration.
/// Continuously receives comment requests from the persistent queue, attempts to post each comment to GitHub using the provided client, and commits successfully processed entries to remove them from the queue. Failed requests remain in the queue for retry. A fixed cooldown period, specified in the configuration, is applied after each attempt regardless of outcome. There is no exponential backoff; all retries use the same cooldown interval.
///
/// A cooldown period, configured via [`Config`], is enforced **between all
/// requests**, regardless of success or failure. After each attempt the worker
/// waits for the cooldown duration before handling the next queue item. There
/// is no exponential backoff; failed requests are retried after the same
/// cooldown period.
/// # Errors
///
/// # Parameters
/// - `config`: shared daemon configuration.
/// - `rx`: receiver half of the queue channel.
/// - `octocrab`: authenticated GitHub client.
/// Returns errors from queue operations, deserialisation, or GitHub client failures.
///
/// # Errors
/// Propagates I/O and serialization errors from queue operations and any error
/// returned by the GitHub client.
/// # Examples
///
/// ```no_run
/// use std::sync::Arc;
/// # use comenqd::{Config, run_worker};
/// # use yaque::Receiver;
/// # use octocrab::Octocrab;
/// # async fn example() -> anyhow::Result<()> {
/// let config = Arc::new(Config::default());
/// let rx: Receiver = /* obtain from yaque */ unimplemented!();
/// let octocrab = Arc::new(Octocrab::builder().build()?);
/// run_worker(config, rx, octocrab).await?;
/// # Ok(())
/// # }
/// ```
pub async fn run_worker(
config: Arc<Config>,
mut rx: Receiver,
Expand Down