Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 12 additions & 33 deletions crates/comenqd/src/daemon.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
//!
//! This module provides the run function used by `main` which spawns the
//! Unix socket listener and the queue worker.

use crate::config::Config;
use anyhow::Result;
use comenq_lib::CommentRequest;
Expand All @@ -17,7 +16,6 @@ use tokio::io::AsyncReadExt;
use tokio::net::{UnixListener, UnixStream};
use tokio::sync::{mpsc, watch};
use yaque::{Receiver, Sender, channel};

fn build_octocrab(token: &str) -> Result<Octocrab> {
Ok(Octocrab::builder()
.personal_token(token.to_string())
Expand All @@ -32,7 +30,6 @@ fn prepare_listener(path: &Path) -> Result<UnixListener> {
stdfs::set_permissions(path, stdfs::Permissions::from_mode(0o660))?;
Ok(listener)
}

async fn ensure_queue_dir(path: &Path) -> Result<()> {
fs::create_dir_all(path).await?;
Ok(())
Expand Down Expand Up @@ -89,7 +86,6 @@ pub async fn run(config: Config) -> Result<()> {
let (client_tx, client_rx) = mpsc::unbounded_channel();
let cfg = Arc::new(config);
let (shutdown_tx, shutdown_rx) = watch::channel(());

let writer = tokio::spawn(queue_writer(queue_tx, client_rx));
let listener = tokio::spawn(run_listener(cfg.clone(), client_tx, shutdown_rx));
let worker = tokio::spawn(run_worker(cfg.clone(), rx, octocrab));
Expand All @@ -104,10 +100,8 @@ pub async fn run(config: Config) -> Result<()> {
Err(e) => return Err(e.into()),
},
}

let _ = shutdown_tx.send(());
writer.await??;

Ok(())
}

Expand Down Expand Up @@ -259,12 +253,19 @@ mod tests {
use tokio::time::{Duration, sleep};
use wiremock::matchers::{method, path};
use wiremock::{Mock, MockServer, ResponseTemplate};
fn cfg_with_cooldown(dir: &TempDir, secs: u64) -> Config {
Config {
cooldown_period_seconds: secs,
..temp_config(dir)
}
}

async fn setup_run_worker(status: u16) -> (MockServer, Arc<Config>, Receiver, Arc<Octocrab>) {
let dir = tempdir().expect("tempdir");
let mut base = temp_config(&dir);
base.cooldown_period_seconds = 0;
let cfg = Arc::new(base);
let cfg = Arc::new(Config {
cooldown_period_seconds: 0,
..temp_config(&dir)
});
let (sender, rx) = channel(&cfg.queue_path).expect("channel");
let req = CommentRequest {
owner: "o".into(),
Expand All @@ -285,7 +286,6 @@ mod tests {
.await;

let octo = octocrab_for(&server);

(server, cfg, rx, octo)
}

Expand All @@ -302,17 +302,10 @@ mod tests {
#[tokio::test]
async fn run_creates_queue_directory() {
let dir = tempdir().expect("Failed to create temporary directory");
let cfg = Config {
cooldown_period_seconds: 1,
..temp_config(&dir)
};

let cfg = cfg_with_cooldown(&dir, 1);
assert!(!cfg.queue_path.exists());

let handle = tokio::spawn(run(cfg.clone()));

wait_for_file(&cfg.queue_path, 200, Duration::from_millis(10)).await;

handle.abort();
assert!(cfg.queue_path.is_dir(), "queue directory not created");
}
Expand All @@ -325,7 +318,6 @@ mod tests {

let listener = prepare_listener(&sock).expect("prepare listener");
drop(listener);

let meta = stdfs::metadata(&sock).expect("metadata");
assert_eq!(meta.permissions().mode() & 0o777, 0o660);
}
Expand All @@ -340,7 +332,6 @@ mod tests {

let (mut client, server) = UnixStream::pair().expect("pair");
let handle = tokio::spawn(handle_client(server, client_tx));

let req = CommentRequest {
owner: "o".into(),
repo: "r".into(),
Expand All @@ -352,7 +343,6 @@ mod tests {
client.shutdown().await.expect("shutdown");
handle.await.expect("join").expect("client");
drop(writer); // stop queue writer

let guard = receiver.recv().await.expect("recv");
let stored: CommentRequest = serde_json::from_slice(&guard).expect("parse");
assert_eq!(stored, req);
Expand All @@ -361,20 +351,13 @@ mod tests {
#[tokio::test]
async fn run_listener_accepts_connections() {
let dir = tempdir().expect("tempdir");
let cfg = Arc::new(Config {
cooldown_period_seconds: 1,
..temp_config(&dir)
});

let cfg = Arc::new(cfg_with_cooldown(&dir, 1));
let (sender, mut receiver) = channel(&cfg.queue_path).expect("channel");
let (client_tx, writer_rx) = mpsc::unbounded_channel();
let (shutdown_tx, shutdown_rx) = watch::channel(());
let writer = tokio::spawn(queue_writer(sender, writer_rx));

let listener_task = tokio::spawn(run_listener(cfg.clone(), client_tx, shutdown_rx));

wait_for_file(&cfg.socket_path, 10, Duration::from_millis(10)).await;

let mut stream = UnixStream::connect(&cfg.socket_path)
.await
.expect("connect");
Expand All @@ -387,11 +370,9 @@ mod tests {
let payload = serde_json::to_vec(&req).expect("serialize");
stream.write_all(&payload).await.expect("write");
stream.shutdown().await.expect("shutdown");

let guard = receiver.recv().await.expect("recv");
let stored: CommentRequest = serde_json::from_slice(&guard).expect("parse");
assert_eq!(stored, req);

listener_task.abort();
let _ = shutdown_tx.send(());
drop(writer);
Expand All @@ -403,7 +384,6 @@ mod tests {
let h = tokio::spawn(run_worker(cfg.clone(), rx, octo));
sleep(Duration::from_millis(50)).await;
h.abort();

assert_eq!(server.received_requests().await.unwrap().len(), 1);
assert_eq!(std::fs::read_dir(&cfg.queue_path).unwrap().count(), 0);
}
Expand All @@ -414,7 +394,6 @@ mod tests {
let h = tokio::spawn(run_worker(cfg.clone(), rx, octo));
sleep(Duration::from_millis(50)).await;
h.abort();

assert_eq!(server.received_requests().await.unwrap().len(), 1);
assert!(std::fs::read_dir(&cfg.queue_path).unwrap().count() > 0);
}
Expand Down
2 changes: 1 addition & 1 deletion test-support/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,4 @@ tokio = { workspace = true }
comenqd = { path = "../crates/comenqd" }
octocrab = { workspace = true }
tempfile = { workspace = true }
wiremock = "0.6"
wiremock = "^0.6"
1 change: 0 additions & 1 deletion tests/cucumber.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
mod steps;
mod util;
mod support;
mod util;
use cucumber::World as _;
Expand Down