Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
340 changes: 340 additions & 0 deletions Cargo.lock

Large diffs are not rendered by default.

6 changes: 4 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@ cucumber = "0.20"
tokio = { workspace = true }
clap = { workspace = true }
comenq = { path = "crates/comenq" }
tempfile = { workspace = true }
comenqd = { path = "crates/comenqd" }
ortho_config = { git = "https://github.com/leynos/ortho-config.git", tag = "v0.4.0" }
tempfile = "3.10" # latest 3.x at time of writing; update as new patch versions release

[[test]]
name = "cucumber"
Expand All @@ -39,7 +41,7 @@ tracing = "0.1"
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
anyhow = "1.0"
thiserror = "1.0"
tempfile = "3"
ortho_config = { git = "https://github.com/leynos/ortho-config.git", tag = "v0.4.0" }
Comment thread
leynos marked this conversation as resolved.

[lints.clippy]
pedantic = { level = "warn", priority = -1 }
Expand Down
8 changes: 8 additions & 0 deletions crates/comenqd/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ name = "comenqd"
version = "0.1.0"
edition = "2024"


[dependencies]
tokio = { workspace = true }
clap = { workspace = true }
Expand All @@ -15,3 +16,10 @@ tracing-subscriber = { workspace = true }
anyhow = { workspace = true }
thiserror = { workspace = true }
comenq-lib = { path = "../.." }
ortho_config = { workspace = true }
figment = { version = "0.10", default-features = false, features = ["env", "toml"] }

Comment thread
leynos marked this conversation as resolved.
[dev-dependencies]
rstest = "0.18.0"
tempfile = "3.10" # latest 3.x at time of writing; update as new patch versions release
serial_test = "2"
226 changes: 226 additions & 0 deletions crates/comenqd/src/config.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,226 @@
//! Configuration loading for the Comenqd daemon.
//!
//! The configuration is stored in `/etc/comenqd/config.toml`. Values may be
//! overridden by environment variables using the `COMENQD_` prefix.

use clap::Parser;
use figment::providers::Env;
use serde::{Deserialize, Serialize};
use std::io;
use std::path::{Path, PathBuf};

/// Default socket path when none is provided.
const DEFAULT_SOCKET_PATH: &str = "/run/comenq/comenq.sock";
/// Default queue directory when none is provided.
const DEFAULT_QUEUE_PATH: &str = "/var/lib/comenq/queue";
/// Default cooldown in seconds between comment posts.
const DEFAULT_COOLDOWN: u64 = 900;

/// Runtime configuration for the daemon.
#[derive(Debug, Deserialize, Serialize, PartialEq, Eq)]
pub struct Config {
/// GitHub Personal Access Token.
pub github_token: String,
/// Path to the Unix Domain Socket.
#[serde(default = "default_socket_path")]
pub socket_path: PathBuf,
/// Directory for the persistent queue.
#[serde(default = "default_queue_path")]
pub queue_path: PathBuf,
/// Cooldown between comment posts in seconds.
#[serde(default = "default_cooldown")]
pub cooldown_period_seconds: u64,
}

/// Command-line overrides for configuration values.
#[derive(Debug, Default, Parser, Serialize)]
struct CliArgs {
/// Path to the configuration file.
#[arg(short, long, value_name = "FILE", default_value = Config::DEFAULT_PATH)]
config: PathBuf,
/// GitHub Personal Access Token.
#[arg(long)]
github_token: Option<String>,
/// Override the Unix Domain Socket path.
#[arg(long)]
socket_path: Option<PathBuf>,
/// Override the queue directory.
#[arg(long)]
queue_path: Option<PathBuf>,
}

fn default_socket_path() -> PathBuf {
PathBuf::from(DEFAULT_SOCKET_PATH)
}

fn default_queue_path() -> PathBuf {
PathBuf::from(DEFAULT_QUEUE_PATH)
}

fn default_cooldown() -> u64 {
DEFAULT_COOLDOWN
}

impl Config {
/// Default location of the daemon configuration file.
pub const DEFAULT_PATH: &'static str = "/etc/comenqd/config.toml";

/// Load the configuration using command-line overrides and environment
/// variables.
#[expect(clippy::result_large_err, reason = "propagate figment errors")]
pub fn load() -> Result<Self, ortho_config::OrthoError> {
let args = CliArgs::parse();
Self::from_file_with_cli(&args.config, &args)
}

/// Load the configuration from the specified path, merging `COMENQD_*`
/// environment variables and CLI arguments over file values.
#[expect(clippy::result_large_err, reason = "propagate figment errors")]
pub fn from_file(path: &Path) -> Result<Self, ortho_config::OrthoError> {
Self::from_file_with_cli(path, &CliArgs::default())
}

#[expect(clippy::result_large_err, reason = "propagate figment errors")]
fn from_file_with_cli(path: &Path, cli: &CliArgs) -> Result<Self, ortho_config::OrthoError> {
Comment thread
leynos marked this conversation as resolved.
let mut fig = ortho_config::load_config_file(path)?.ok_or_else(|| {
ortho_config::OrthoError::File {
path: path.to_path_buf(),
source: Box::new(io::Error::new(
io::ErrorKind::NotFound,
"Configuration file not found",
)),
}
})?;

fig = fig.merge(Env::prefixed("COMENQD_").split("__"));
let mut cfg: Self = fig.extract().map_err(ortho_config::OrthoError::from)?;

if let Some(token) = &cli.github_token {
cfg.github_token = token.clone();
}
if let Some(socket) = &cli.socket_path {
cfg.socket_path = socket.clone();
}
if let Some(queue) = &cli.queue_path {
cfg.queue_path = queue.clone();
}
Ok(cfg)
}
}

#[cfg(test)]
mod tests {
use super::*;
use rstest::rstest;
use std::fs;
use tempfile::tempdir;

struct EnvVarGuard {
key: String,
original: Option<String>,
}

impl EnvVarGuard {
fn set(key: &str, val: &str) -> Self {
let original = std::env::var(key).ok();
set_env_var(key, val);
Self {
key: key.to_string(),
original,
}
}
}

impl Drop for EnvVarGuard {
fn drop(&mut self) {
match &self.original {
Some(v) => set_env_var(&self.key, v),
None => remove_env_var(&self.key),
}
}
}

fn remove_env(key: &str) {
remove_env_var(key);
}

/// Safely set an environment variable for tests.
fn set_env_var(key: &str, val: &str) {
// Safety: tests using `serial_test::serial` run single-threaded.
unsafe { std::env::set_var(key, val) };
}

/// Safely remove an environment variable for tests.
fn remove_env_var(key: &str) {
// Safety: tests using `serial_test::serial` run single-threaded.
unsafe { std::env::remove_var(key) };
}

#[rstest]
#[serial_test::serial]
fn loads_from_file() {
let dir = tempdir().unwrap();
let path = dir.path().join("config.toml");
fs::write(
&path,
"github_token='abc'\nsocket_path='/tmp/s.sock'\nqueue_path='/tmp/q'",
)
.unwrap();
remove_env("COMENQD_SOCKET_PATH");
let cfg = Config::from_file(&path).unwrap();
assert_eq!(cfg.github_token, "abc");
assert_eq!(cfg.socket_path, PathBuf::from("/tmp/s.sock"));
assert_eq!(cfg.queue_path, PathBuf::from("/tmp/q"));
}

#[rstest]
#[serial_test::serial]
fn error_when_missing_file() {
let path = PathBuf::from("/nonexistent/file.toml");
let res = Config::from_file(&path);
assert!(res.is_err());
}

#[rstest]
#[serial_test::serial]
fn env_vars_override_file() {
let dir = tempdir().unwrap();
let path = dir.path().join("config.toml");
fs::write(&path, "github_token='abc'\nsocket_path='/tmp/s.sock'").unwrap();
let _guard = EnvVarGuard::set("COMENQD_SOCKET_PATH", "/tmp/override.sock");
let cfg = Config::from_file(&path).unwrap();
assert_eq!(cfg.socket_path, PathBuf::from("/tmp/override.sock"));
}

#[rstest]
#[serial_test::serial]
fn error_with_invalid_toml() {
let dir = tempdir().unwrap();
let path = dir.path().join("config.toml");
fs::write(&path, "github_token='abc' this is not toml").unwrap();
let res = Config::from_file(&path);
assert!(res.is_err());
}

#[rstest]
#[serial_test::serial]
fn error_when_missing_token() {
let dir = tempdir().unwrap();
let path = dir.path().join("config.toml");
fs::write(&path, "socket_path='/tmp/s.sock'").unwrap();
let res = Config::from_file(&path);
assert!(res.is_err());
}

#[rstest]
#[serial_test::serial]
fn defaults_are_applied() {
let dir = tempdir().unwrap();
let path = dir.path().join("config.toml");
fs::write(&path, "github_token='abc'").unwrap();
let cfg = Config::from_file(&path).unwrap();
assert_eq!(cfg.socket_path, PathBuf::from("/run/comenq/comenq.sock"));
assert_eq!(cfg.queue_path, PathBuf::from("/var/lib/comenq/queue"));
assert_eq!(cfg.cooldown_period_seconds, DEFAULT_COOLDOWN);
}
}
98 changes: 98 additions & 0 deletions crates/comenqd/src/daemon.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
//! Asynchronous daemon tasks for comenqd.
//!
//! This module provides the run function used by `main` which spawns the
//! Unix socket listener and the queue worker.

use crate::config::Config;
use anyhow::Result;
use comenq_lib::CommentRequest;
use octocrab::Octocrab;
use std::fs;
use std::os::unix::fs::PermissionsExt;
use std::path::Path;
use std::sync::Arc;
use std::time::Duration;
use tokio::io::AsyncReadExt;
use tokio::net::{UnixListener, UnixStream};
use yaque::{Receiver, Sender, channel};

fn build_octocrab(token: &str) -> Result<Octocrab> {
Ok(Octocrab::builder()
.personal_token(token.to_string())
.build()?)
}

fn prepare_listener(path: &Path) -> Result<UnixListener> {
if fs::metadata(path).is_ok() {
fs::remove_file(path)?;
}
let listener = UnixListener::bind(path)?;
fs::set_permissions(path, fs::Permissions::from_mode(0o660))?;
Comment thread
leynos marked this conversation as resolved.
Ok(listener)
}

/// Start the daemon with the provided configuration.
pub async fn run(config: Config) -> Result<()> {
let octocrab = Arc::new(build_octocrab(&config.github_token)?);
let (tx, rx) = channel(&config.queue_path)?;
let cfg = Arc::new(config);

let listener = tokio::spawn(run_listener(cfg.clone(), tx));
let worker = tokio::spawn(run_worker(cfg.clone(), rx, octocrab));
Comment thread
leynos marked this conversation as resolved.

tokio::select! {
res = listener => match res {
Ok(inner) => inner?,
Err(e) => return Err(e.into()),
},
res = worker => match res {
Ok(inner) => inner?,
Err(e) => return Err(e.into()),
},
}

Ok(())
}

async fn run_listener(config: Arc<Config>, mut tx: Sender) -> Result<()> {
let listener = prepare_listener(&config.socket_path)?;

loop {
let (stream, _) = listener.accept().await?;
if let Err(e) = handle_client(stream, &mut tx).await {
tracing::warn!(error = %e, "Client handling failed");
}
}
}

async fn handle_client(mut stream: UnixStream, tx: &mut Sender) -> Result<()> {
let mut buffer = Vec::new();
stream.read_to_end(&mut buffer).await?;
let request: CommentRequest = serde_json::from_slice(&buffer)?;
Comment thread
leynos marked this conversation as resolved.
let bytes = serde_json::to_vec(&request)?;
tx.send(bytes).await?;
Ok(())
}

async fn run_worker(config: Arc<Config>, mut rx: Receiver, octocrab: Arc<Octocrab>) -> Result<()> {
loop {
let guard = rx.recv().await?;
let request: CommentRequest = serde_json::from_slice(&guard)?;

let issues = octocrab.issues(&request.owner, &request.repo);
let post = issues.create_comment(request.pr_number, &request.body);

match tokio::time::timeout(Duration::from_secs(10), post).await {
Ok(Ok(_)) => {
guard.commit()?;
tokio::time::sleep(Duration::from_secs(config.cooldown_period_seconds)).await;
}
Ok(Err(e)) => {
tracing::error!(error = %e, owner = %request.owner, repo = %request.repo, pr = request.pr_number, "GitHub API call failed");
}
Err(e) => {
tracing::error!(error = %e, "Timed out posting comment");
}
}
}
}
18 changes: 18 additions & 0 deletions crates/comenqd/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
//! Library components for the Comenqd daemon.
//!
//! # Overview
//! This crate exposes:
//! - [`config::Config`] — typed, validated daemon configuration loaded from
//! `/etc/comenqd/config.toml` with environment and CLI overrides.
//! - Further daemon-specific helpers (to be added).
//!
//! # Examples
//! ```rust,no_run
//! use comenqd::config::Config;
//!
//! let cfg = Config::load().expect("configuration must be valid");
//! println!("socket: {}", cfg.socket_path.display());
//! ```

pub mod config;
pub mod daemon;
Loading