Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ comenq = { path = "crates/comenq" }
comenqd = { path = "crates/comenqd" }
ortho_config = { git = "https://github.com/leynos/ortho-config.git", tag = "v0.4.0" }
tempfile = "3.10" # latest 3.x at time of writing; update as new patch versions release
yaque = { workspace = true }

[[test]]
name = "cucumber"
Expand Down
148 changes: 140 additions & 8 deletions crates/comenqd/src/daemon.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use std::time::Duration;
use tokio::fs;
use tokio::io::AsyncReadExt;
use tokio::net::{UnixListener, UnixStream};
use tokio::sync::{mpsc, watch};
use yaque::{Receiver, Sender, channel};

fn build_octocrab(token: &str) -> Result<Octocrab> {
Expand All @@ -37,15 +38,30 @@ async fn ensure_queue_dir(path: &Path) -> Result<()> {
Ok(())
}

pub async fn queue_writer(
mut sender: Sender,
mut rx: mpsc::UnboundedReceiver<Vec<u8>>,
) -> Result<()> {
while let Some(bytes) = rx.recv().await {
if let Err(e) = sender.send(bytes).await {
tracing::error!(error = %e, "Queue enqueue failed");
}
}
Ok(())
}

/// Start the daemon with the provided configuration.
pub async fn run(config: Config) -> Result<()> {
ensure_queue_dir(&config.queue_path).await?;
tracing::info!(queue = %config.queue_path.display(), "Queue directory prepared");
let octocrab = Arc::new(build_octocrab(&config.github_token)?);
let (tx, rx) = channel(&config.queue_path)?;
let (queue_tx, rx) = channel(&config.queue_path)?;
let (client_tx, client_rx) = mpsc::unbounded_channel();
let cfg = Arc::new(config);
let (shutdown_tx, shutdown_rx) = watch::channel(());

let listener = tokio::spawn(run_listener(cfg.clone(), tx));
let writer = tokio::spawn(queue_writer(queue_tx, client_rx));
let listener = tokio::spawn(run_listener(cfg.clone(), client_tx, shutdown_rx));
let worker = tokio::spawn(run_worker(cfg.clone(), rx, octocrab));

tokio::select! {
Expand All @@ -59,26 +75,50 @@ pub async fn run(config: Config) -> Result<()> {
},
}

let _ = shutdown_tx.send(());
writer.await??;

Ok(())
}

async fn run_listener(config: Arc<Config>, mut tx: Sender) -> Result<()> {
pub async fn run_listener(
config: Arc<Config>,
tx: mpsc::UnboundedSender<Vec<u8>>,
mut shutdown: watch::Receiver<()>,
) -> Result<()> {
let listener = prepare_listener(&config.socket_path)?;

loop {
let (stream, _) = listener.accept().await?;
if let Err(e) = handle_client(stream, &mut tx).await {
tracing::warn!(error = %e, "Client handling failed");
tokio::select! {
res = listener.accept() => match res {
Ok((stream, _)) => {
let tx_clone = tx.clone();
tokio::spawn(async move {
if let Err(e) = handle_client(stream, tx_clone).await {
tracing::warn!(error = %e, "Client handling failed");
}
});
}
Err(e) => {
tracing::error!(error = %e, "Failed to accept client connection");
tokio::time::sleep(Duration::from_millis(100)).await;
}
},
_ = shutdown.changed() => {
break;
}
}
}
Ok(())
}

async fn handle_client(mut stream: UnixStream, tx: &mut Sender) -> Result<()> {
async fn handle_client(mut stream: UnixStream, tx: mpsc::UnboundedSender<Vec<u8>>) -> Result<()> {
let mut buffer = Vec::new();
stream.read_to_end(&mut buffer).await?;
let request: CommentRequest = serde_json::from_slice(&buffer)?;
let bytes = serde_json::to_vec(&request)?;
tx.send(bytes).await?;
tx.send(bytes)
.map_err(|_| anyhow::anyhow!("queue writer dropped"))?;
Ok(())
}

Expand Down Expand Up @@ -107,8 +147,12 @@ async fn run_worker(config: Arc<Config>, mut rx: Receiver, octocrab: Arc<Octocra

#[cfg(test)]
mod tests {
Comment thread
leynos marked this conversation as resolved.
//! Tests for the daemon tasks.
use super::*;
use tempfile::tempdir;
use tokio::io::AsyncWriteExt;
use tokio::net::{UnixListener, UnixStream};
use tokio::sync::{mpsc, watch};
use tokio::time::{Duration, Instant, sleep};

#[tokio::test]
Expand Down Expand Up @@ -147,4 +191,92 @@ mod tests {
handle.abort();
assert!(cfg.queue_path.is_dir(), "queue directory not created");
}

#[tokio::test]
async fn prepare_listener_sets_permissions() {
let dir = tempdir().expect("tempdir");
let sock = dir.path().join("sock");
stdfs::write(&sock, b"stale").expect("create stale file");

let listener = prepare_listener(&sock).expect("prepare listener");
drop(listener);

let meta = stdfs::metadata(&sock).expect("metadata");
assert_eq!(meta.permissions().mode() & 0o777, 0o660);
}

#[tokio::test]
async fn handle_client_enqueues_request() {
let dir = tempdir().expect("tempdir");
let queue_path = dir.path().join("q");
let (sender, mut receiver) = channel(&queue_path).expect("channel");
let (client_tx, mut writer_rx) = mpsc::unbounded_channel();
let writer = tokio::spawn(queue_writer(sender, writer_rx));

let (mut client, server) = UnixStream::pair().expect("pair");
let handle = tokio::spawn(handle_client(server, client_tx));

let req = CommentRequest {
owner: "o".into(),
repo: "r".into(),
pr_number: 1,
body: "b".into(),
};
let payload = serde_json::to_vec(&req).expect("serialize");
client.write_all(&payload).await.expect("write");
client.shutdown().await.expect("shutdown");
handle.await.expect("join").expect("client");
drop(writer); // stop queue writer

let guard = receiver.recv().await.expect("recv");
let stored: CommentRequest = serde_json::from_slice(&guard).expect("parse");
assert_eq!(stored, req);
}

#[tokio::test]
async fn run_listener_accepts_connections() {
let dir = tempdir().expect("tempdir");
let cfg = Arc::new(Config {
github_token: "t".into(),
socket_path: dir.path().join("sock"),
queue_path: dir.path().join("q"),
cooldown_period_seconds: 1,
});

let (sender, mut receiver) = channel(&cfg.queue_path).expect("channel");
let (client_tx, writer_rx) = mpsc::unbounded_channel();
let (shutdown_tx, shutdown_rx) = watch::channel(());
let writer = tokio::spawn(queue_writer(sender, writer_rx));

let listener_task = tokio::spawn(run_listener(cfg.clone(), client_tx, shutdown_rx));

// Wait for socket to exist
for _ in 0..10 {
if cfg.socket_path.exists() {
break;
}
sleep(Duration::from_millis(10)).await;
}

let mut stream = UnixStream::connect(&cfg.socket_path)
.await
.expect("connect");
let req = CommentRequest {
owner: "o".into(),
repo: "r".into(),
pr_number: 1,
body: "b".into(),
};
let payload = serde_json::to_vec(&req).expect("serialize");
stream.write_all(&payload).await.expect("write");
stream.shutdown().await.expect("shutdown");

let guard = receiver.recv().await.expect("recv");
let stored: CommentRequest = serde_json::from_slice(&guard).expect("parse");
assert_eq!(stored, req);

listener_task.abort();
let _ = shutdown_tx.send(());
drop(writer);
}
}
5 changes: 4 additions & 1 deletion docs/comenq-design.md
Original file line number Diff line number Diff line change
Expand Up @@ -1016,7 +1016,10 @@ derives both `Serialize` and `Deserialize`. The daemon now spawns a Unix
listener and queue worker as described above. Structured logging is initialised
using `tracing_subscriber` with JSON output controlled by the `RUST_LOG`
environment variable. The queue directory is created asynchronously on start if
it does not already exist before `yaque` opens it.
it does not already exist before `yaque` opens it. Incoming requests are
forwarded from the listener to a dedicated queue writer task over a Tokio
`mpsc` channel. This task serializes writes to the `yaque::Sender`, preserving
single-writer semantics without per-connection locking.

## Works cited

Expand Down
10 changes: 5 additions & 5 deletions docs/roadmap.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,17 +45,17 @@

## Milestone 4: `comenqd` Daemon — UDS Listener Task

- [ ] Implement the `run_listener` async task.
- [x] Implement the `run_listener` async task.

- [ ] Bind a `tokio::net::UnixListener` to the configured socket path, ensuring
- [x] Bind a `tokio::net::UnixListener` to the configured socket path, ensuring
any stale socket file is removed first.

- [ ] Set the socket file permissions to `0o660` to enforce the security model.
- [x] Set the socket file permissions to `0o660` to enforce the security model.

- [ ] Create an acceptance loop (`listener.accept().await`) that spawns a new
- [x] Create an acceptance loop (`listener.accept().await`) that spawns a new
task for each incoming client connection.

- [ ] Implement the `handle_client` task to read the JSON payload, deserialize
- [x] Implement the `handle_client` task to read the JSON payload, deserialize
it into a `CommentRequest`, and enqueue it using the `yaque` sender.

## Milestone 5: `comenqd` Daemon — Queue Worker Task
Expand Down
3 changes: 2 additions & 1 deletion tests/cucumber.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
mod steps;
use cucumber::World as _;
use steps::{CliWorld, ClientWorld, CommentWorld, ConfigWorld};
use steps::{CliWorld, ClientWorld, CommentWorld, ConfigWorld, ListenerWorld};

#[tokio::main]
async fn main() {
Expand All @@ -9,5 +9,6 @@ async fn main() {
ClientWorld::run("tests/features/client_main.feature"),
CommentWorld::run("tests/features/comment_request.feature"),
ConfigWorld::run("tests/features/config.feature"),
ListenerWorld::run("tests/features/listener.feature"),
);
}
11 changes: 11 additions & 0 deletions tests/features/listener.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
Feature: Daemon listener

Scenario: handling a valid request
Given a running listener task
When a client sends a valid request
Then the request is enqueued

Scenario: handling invalid JSON
Given a running listener task
When a client sends invalid JSON
Then the request is rejected
Loading