diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml new file mode 100644 index 00000000..fc2df694 --- /dev/null +++ b/.github/workflows/ci.yml @@ -0,0 +1,75 @@ +name: CI + +on: + push: + branches: [main] + pull_request: + branches: [main] + workflow_call: + +env: + CARGO_TERM_COLOR: always + RUSTFLAGS: -D warnings + +jobs: + fmt: + name: Format + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + - uses: dtolnay/rust-toolchain@stable + with: + components: rustfmt + - run: cargo fmt --check + + clippy: + name: Clippy + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + - uses: dtolnay/rust-toolchain@stable + with: + components: clippy + - uses: actions/cache@v4 + with: + path: | + ~/.cargo/registry + ~/.cargo/git + target + key: clippy-${{ hashFiles('**/Cargo.lock') }} + restore-keys: clippy- + - run: cargo clippy --workspace -- -D warnings + + test: + name: Test + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + - uses: dtolnay/rust-toolchain@stable + - uses: actions/cache@v4 + with: + path: | + ~/.cargo/registry + ~/.cargo/git + target + key: test-${{ hashFiles('**/Cargo.lock') }} + restore-keys: test- + - run: cargo test --workspace + + wasm: + name: WASM Check + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + - uses: dtolnay/rust-toolchain@stable + with: + targets: wasm32-unknown-unknown + - uses: actions/cache@v4 + with: + path: | + ~/.cargo/registry + ~/.cargo/git + target + key: wasm-${{ hashFiles('**/Cargo.lock') }} + restore-keys: wasm- + - run: cargo check --target wasm32-unknown-unknown --workspace --exclude willow-relay --exclude willow-worker --exclude willow-replay --exclude willow-storage diff --git a/.github/workflows/deploy.yml b/.github/workflows/deploy.yml index 4d6ea40b..db628077 100644 --- a/.github/workflows/deploy.yml +++ b/.github/workflows/deploy.yml @@ -4,8 +4,17 @@ on: push: branches: [main] +env: + CARGO_TERM_COLOR: always + jobs: + ci: + name: CI + uses: ./.github/workflows/ci.yml + deploy: + name: Deploy + needs: ci runs-on: ubuntu-latest steps: - uses: actions/checkout@v4 diff --git a/crates/client/src/lib.rs b/crates/client/src/lib.rs index c15706d1..6454eec8 100644 --- a/crates/client/src/lib.rs +++ b/crates/client/src/lib.rs @@ -786,9 +786,9 @@ impl ClientHandle { } // Broadcast after releasing the borrow. for event in events_to_broadcast { - let _ = self.cmd_tx.unbounded_send( - network::NetworkCommand::BroadcastEvent { event, topic: None }, - ); + let _ = self + .cmd_tx + .unbounded_send(network::NetworkCommand::BroadcastEvent { event, topic: None }); } } @@ -4399,10 +4399,7 @@ mod tests { let (client, _rx) = test_client(); client.create_server("Worker Test").unwrap(); - client.authorize_workers(&[ - "worker-peer-1".to_string(), - "worker-peer-2".to_string(), - ]); + client.authorize_workers(&["worker-peer-1".to_string(), "worker-peer-2".to_string()]); let shared = client.shared.borrow(); assert!(shared diff --git a/crates/client/src/ops.rs b/crates/client/src/ops.rs index faf461c2..59456136 100644 --- a/crates/client/src/ops.rs +++ b/crates/client/src/ops.rs @@ -17,9 +17,7 @@ use serde::{Deserialize, Serialize}; // Re-export wire types from willow-common so existing imports still work. -pub use willow_common::{ - pack_wire, unpack_wire, VoiceSignalPayload, WireMessage, -}; +pub use willow_common::{pack_wire, unpack_wire, VoiceSignalPayload, WireMessage}; // ───── Join link types ────────────────────────────────────────────────────── diff --git a/crates/client/src/worker_cache.rs b/crates/client/src/worker_cache.rs index fff4a5db..7818f5c6 100644 --- a/crates/client/src/worker_cache.rs +++ b/crates/client/src/worker_cache.rs @@ -223,7 +223,10 @@ mod tests { #[test] fn worker_serving_multiple_servers() { let mut cache = WorkerCache::new(Duration::from_secs(30)); - cache.update(&make_replay_announcement("r1", vec!["srv-1", "srv-2", "srv-3"])); + cache.update(&make_replay_announcement( + "r1", + vec!["srv-1", "srv-2", "srv-3"], + )); assert_eq!(cache.replay_workers_for_server("srv-1").len(), 1); assert_eq!(cache.replay_workers_for_server("srv-2").len(), 1); diff --git a/crates/common/src/worker_types.rs b/crates/common/src/worker_types.rs index a1568cb8..2cb77190 100644 --- a/crates/common/src/worker_types.rs +++ b/crates/common/src/worker_types.rs @@ -69,7 +69,7 @@ pub enum WorkerWireMessage { Response { request_id: String, target_peer: String, - payload: WorkerResponse, + payload: Box, }, } @@ -98,7 +98,7 @@ pub enum WorkerResponse { SyncBatch { events: Vec }, /// Full state snapshot for far-behind peers. - Snapshot { state: ServerState }, + Snapshot { state: Box }, /// Paginated history results. HistoryPage { events: Vec, has_more: bool }, @@ -123,9 +123,10 @@ pub trait WorkerRole: Send + 'static { } /// Allocation strategy for which servers a worker serves. -#[derive(Debug, Clone, Serialize, Deserialize)] +#[derive(Debug, Clone, Default, Serialize, Deserialize)] pub enum AllocationStrategy { /// Serve all discovered servers (initial implementation). + #[default] Global, /// Serve only specific servers (future). PerServer(Vec), @@ -133,12 +134,6 @@ pub enum AllocationStrategy { Dynamic, } -impl Default for AllocationStrategy { - fn default() -> Self { - AllocationStrategy::Global - } -} - #[cfg(test)] mod tests { use super::*; @@ -366,9 +361,9 @@ mod tests { let msg = WorkerWireMessage::Response { request_id: "req-456".to_string(), target_peer: "client-peer".to_string(), - payload: WorkerResponse::Denied { + payload: Box::new(WorkerResponse::Denied { reason: "unknown server".to_string(), - }, + }), }; let bytes = bincode::serialize(&msg).unwrap(); let decoded: WorkerWireMessage = bincode::deserialize(&bytes).unwrap(); diff --git a/crates/replay/src/main.rs b/crates/replay/src/main.rs index c65f2780..76e8b2e5 100644 --- a/crates/replay/src/main.rs +++ b/crates/replay/src/main.rs @@ -37,8 +37,7 @@ struct Cli { async fn main() -> anyhow::Result<()> { tracing_subscriber::fmt() .with_env_filter( - tracing_subscriber::EnvFilter::try_from_default_env() - .unwrap_or_else(|_| "info".into()), + tracing_subscriber::EnvFilter::try_from_default_env().unwrap_or_else(|_| "info".into()), ) .init(); diff --git a/crates/replay/src/role.rs b/crates/replay/src/role.rs index cfc6e524..fdd9bf0a 100644 --- a/crates/replay/src/role.rs +++ b/crates/replay/src/role.rs @@ -98,11 +98,7 @@ impl ReplayRole { impl WorkerRole for ReplayRole { fn role_info(&self) -> WorkerRoleInfo { - let total_events: u32 = self - .servers - .values() - .map(|s| s.events.len() as u32) - .sum(); + let total_events: u32 = self.servers.values().map(|s| s.events.len() as u32).sum(); WorkerRoleInfo::Replay { servers_loaded: self.servers.len() as u32, events_buffered: total_events, @@ -128,7 +124,7 @@ impl WorkerRole for ReplayRole { // Client is too far behind — send full snapshot. match self.servers.get(&server_id) { Some(data) => WorkerResponse::Snapshot { - state: data.state.clone(), + state: Box::new(data.state.clone()), }, None => WorkerResponse::Denied { reason: format!("unknown server: {server_id}"), diff --git a/crates/storage/src/main.rs b/crates/storage/src/main.rs index 298c0b07..85d325d0 100644 --- a/crates/storage/src/main.rs +++ b/crates/storage/src/main.rs @@ -39,8 +39,7 @@ struct Cli { async fn main() -> anyhow::Result<()> { tracing_subscriber::fmt() .with_env_filter( - tracing_subscriber::EnvFilter::try_from_default_env() - .unwrap_or_else(|_| "info".into()), + tracing_subscriber::EnvFilter::try_from_default_env().unwrap_or_else(|_| "info".into()), ) .init(); diff --git a/crates/storage/src/role.rs b/crates/storage/src/role.rs index d2876096..79f98f74 100644 --- a/crates/storage/src/role.rs +++ b/crates/storage/src/role.rs @@ -65,7 +65,10 @@ impl WorkerRole for StorageRole { channel, before_timestamp, limit, - } => match self.store.history(&server_id, &channel, before_timestamp, limit) { + } => match self + .store + .history(&server_id, &channel, before_timestamp, limit) + { Ok((events, has_more)) => WorkerResponse::HistoryPage { events, has_more }, Err(e) => WorkerResponse::Denied { reason: format!("query failed: {e}"), @@ -173,7 +176,8 @@ mod tests { match role.role_info() { WorkerRoleInfo::Storage { - total_events_stored, .. + total_events_stored, + .. } => assert_eq!(total_events_stored, 1), _ => panic!("expected Storage"), } diff --git a/crates/storage/src/store.rs b/crates/storage/src/store.rs index 0135172f..8bd3606b 100644 --- a/crates/storage/src/store.rs +++ b/crates/storage/src/store.rs @@ -129,11 +129,11 @@ impl StorageEventStore { /// Number of distinct servers tracked. pub fn server_count(&self) -> anyhow::Result { - let count: i64 = self.conn.query_row( - "SELECT COUNT(DISTINCT server_id) FROM events", - [], - |row| row.get(0), - )?; + let count: i64 = + self.conn + .query_row("SELECT COUNT(DISTINCT server_id) FROM events", [], |row| { + row.get(0) + })?; Ok(count as u32) } } @@ -193,7 +193,10 @@ mod tests { let store = StorageEventStore::open(":memory:").unwrap(); for i in 0..5u64 { store - .store_event("srv-1", &make_message(&format!("e{i}"), "general", (i + 1) * 1000)) + .store_event( + "srv-1", + &make_message(&format!("e{i}"), "general", (i + 1) * 1000), + ) .unwrap(); } @@ -210,7 +213,10 @@ mod tests { let store = StorageEventStore::open(":memory:").unwrap(); for i in 0..10u64 { store - .store_event("srv-1", &make_message(&format!("e{i}"), "general", (i + 1) * 1000)) + .store_event( + "srv-1", + &make_message(&format!("e{i}"), "general", (i + 1) * 1000), + ) .unwrap(); } @@ -239,9 +245,15 @@ mod tests { #[test] fn history_filters_by_channel() { let store = StorageEventStore::open(":memory:").unwrap(); - store.store_event("srv-1", &make_message("e1", "general", 1000)).unwrap(); - store.store_event("srv-1", &make_message("e2", "random", 2000)).unwrap(); - store.store_event("srv-1", &make_message("e3", "general", 3000)).unwrap(); + store + .store_event("srv-1", &make_message("e1", "general", 1000)) + .unwrap(); + store + .store_event("srv-1", &make_message("e2", "random", 2000)) + .unwrap(); + store + .store_event("srv-1", &make_message("e3", "general", 3000)) + .unwrap(); let (events, _) = store.history("srv-1", "general", None, 10).unwrap(); assert_eq!(events.len(), 2); @@ -250,8 +262,12 @@ mod tests { #[test] fn history_filters_by_server() { let store = StorageEventStore::open(":memory:").unwrap(); - store.store_event("srv-1", &make_message("e1", "general", 1000)).unwrap(); - store.store_event("srv-2", &make_message("e2", "general", 2000)).unwrap(); + store + .store_event("srv-1", &make_message("e1", "general", 1000)) + .unwrap(); + store + .store_event("srv-2", &make_message("e2", "general", 2000)) + .unwrap(); let (events, _) = store.history("srv-1", "general", None, 10).unwrap(); assert_eq!(events.len(), 1); @@ -261,9 +277,15 @@ mod tests { #[test] fn server_count_tracks_distinct_servers() { let store = StorageEventStore::open(":memory:").unwrap(); - store.store_event("srv-1", &make_message("e1", "general", 1000)).unwrap(); - store.store_event("srv-2", &make_message("e2", "general", 2000)).unwrap(); - store.store_event("srv-1", &make_message("e3", "general", 3000)).unwrap(); + store + .store_event("srv-1", &make_message("e1", "general", 1000)) + .unwrap(); + store + .store_event("srv-2", &make_message("e2", "general", 2000)) + .unwrap(); + store + .store_event("srv-1", &make_message("e3", "general", 3000)) + .unwrap(); assert_eq!(store.server_count().unwrap(), 2); } @@ -271,7 +293,9 @@ mod tests { #[test] fn disk_usage_returns_nonzero_after_insert() { let store = StorageEventStore::open(":memory:").unwrap(); - store.store_event("srv-1", &make_message("e1", "general", 1000)).unwrap(); + store + .store_event("srv-1", &make_message("e1", "general", 1000)) + .unwrap(); assert!(store.disk_usage_bytes().unwrap() > 0); } diff --git a/crates/web/src/icons.rs b/crates/web/src/icons.rs index 0ed2b6dd..9e9c67f0 100644 --- a/crates/web/src/icons.rs +++ b/crates/web/src/icons.rs @@ -372,9 +372,7 @@ pub fn icon_database() -> impl IntoView { /// Activity/pulse icon (heartbeat line). pub fn icon_activity() -> impl IntoView { icon( - &format!( - r#""# - ), + &format!(r#""#), "icon-activity", ) } diff --git a/crates/worker/src/actors/network.rs b/crates/worker/src/actors/network.rs index 145a9c06..3675ce49 100644 --- a/crates/worker/src/actors/network.rs +++ b/crates/worker/src/actors/network.rs @@ -134,9 +134,7 @@ pub fn parse_server_message(data: &[u8]) -> ServerMessageAction { if let Some((wire_msg, _signer)) = willow_common::unpack_wire(data) { match wire_msg { willow_common::WireMessage::Event(event) => ServerMessageAction::Events(vec![event]), - willow_common::WireMessage::SyncBatch { events } => { - ServerMessageAction::Events(events) - } + willow_common::WireMessage::SyncBatch { events } => ServerMessageAction::Events(events), _ => ServerMessageAction::Ignore, } } else { @@ -170,27 +168,23 @@ async fn handle_incoming_message( return; } - let resp = match tokio::time::timeout( - std::time::Duration::from_secs(5), - reply_rx, - ) - .await - { - Ok(Ok(resp)) => resp, - Ok(Err(_)) => { - warn!(%request_id, "state actor dropped reply channel"); - return; - } - Err(_) => { - warn!(%request_id, "request timed out after 5s"); - return; - } - }; + let resp = + match tokio::time::timeout(std::time::Duration::from_secs(5), reply_rx).await { + Ok(Ok(resp)) => resp, + Ok(Err(_)) => { + warn!(%request_id, "state actor dropped reply channel"); + return; + } + Err(_) => { + warn!(%request_id, "request timed out after 5s"); + return; + } + }; let response_msg = WorkerWireMessage::Response { request_id: request_id.clone(), target_peer: String::new(), - payload: resp, + payload: Box::new(resp), }; if let Ok(bytes) = bincode::serialize(&response_msg) { if let Err(e) = node.publish(WORKERS_TOPIC, bytes) { @@ -319,9 +313,9 @@ mod tests { let msg = WorkerWireMessage::Response { request_id: "r1".to_string(), target_peer: "my-peer".to_string(), - payload: WorkerResponse::Denied { + payload: Box::new(WorkerResponse::Denied { reason: "test".to_string(), - }, + }), }; let data = bincode::serialize(&msg).unwrap(); @@ -362,9 +356,8 @@ mod tests { }, }; - let data = - willow_common::pack_wire(&willow_common::WireMessage::Event(event.clone()), &id) - .unwrap(); + let data = willow_common::pack_wire(&willow_common::WireMessage::Event(event.clone()), &id) + .unwrap(); match parse_server_message(&data) { ServerMessageAction::Events(events) => { @@ -403,9 +396,8 @@ mod tests { }, ]; - let data = - willow_common::pack_wire(&willow_common::WireMessage::SyncBatch { events }, &id) - .unwrap(); + let data = willow_common::pack_wire(&willow_common::WireMessage::SyncBatch { events }, &id) + .unwrap(); match parse_server_message(&data) { ServerMessageAction::Events(events) => assert_eq!(events.len(), 2), diff --git a/crates/worker/tests/integration.rs b/crates/worker/tests/integration.rs index 7e651ea3..fd67770e 100644 --- a/crates/worker/tests/integration.rs +++ b/crates/worker/tests/integration.rs @@ -54,7 +54,7 @@ impl WorkerRole for TestReplayRole { } } else { WorkerResponse::Snapshot { - state: self.state.clone(), + state: Box::new(self.state.clone()), } } } @@ -88,9 +88,12 @@ async fn state_actor_with_replay_role_full_flow() { // 1. Ingest 5 events. for i in 0..5u64 { - tx.send(StateMsg::Event(make_message(&format!("e{i}"), (i + 1) * 1000))) - .await - .unwrap(); + tx.send(StateMsg::Event(make_message( + &format!("e{i}"), + (i + 1) * 1000, + ))) + .await + .unwrap(); } // 2. Verify role info shows 5 buffered events. @@ -170,8 +173,7 @@ async fn heartbeat_and_state_actor_interaction() { match msg { NetworkOutMsg::Publish { data, .. } => { - let decoded: willow_common::WorkerWireMessage = - bincode::deserialize(&data).unwrap(); + let decoded: willow_common::WorkerWireMessage = bincode::deserialize(&data).unwrap(); match decoded { willow_common::WorkerWireMessage::Announcement(a) => { assert_eq!(a.peer_id, "test-worker"); @@ -238,9 +240,12 @@ async fn events_applied_then_queried_via_request() { // Ingest 10 events into a buffer of size 5. for i in 0..10u64 { - tx.send(StateMsg::Event(make_message(&format!("e{i}"), (i + 1) * 1000))) - .await - .unwrap(); + tx.send(StateMsg::Event(make_message( + &format!("e{i}"), + (i + 1) * 1000, + ))) + .await + .unwrap(); } // Query — should only get 5 (buffer evicted oldest). @@ -294,8 +299,7 @@ async fn graceful_shutdown_sends_departure() { match msg { NetworkOutMsg::Publish { data, .. } => { - let decoded: willow_common::WorkerWireMessage = - bincode::deserialize(&data).unwrap(); + let decoded: willow_common::WorkerWireMessage = bincode::deserialize(&data).unwrap(); match decoded { willow_common::WorkerWireMessage::Departure { peer_id } => { assert_eq!(peer_id, "departing-worker"); @@ -344,7 +348,10 @@ async fn full_actor_orchestration_without_network() { // Ingest some events. for i in 0..3u64 { state_tx - .send(StateMsg::Event(make_message(&format!("orch-{i}"), (i + 1) * 1000))) + .send(StateMsg::Event(make_message( + &format!("orch-{i}"), + (i + 1) * 1000, + ))) .await .unwrap(); } @@ -356,8 +363,7 @@ async fn full_actor_orchestration_without_network() { match tokio::time::timeout(Duration::from_millis(30), network_rx.recv()).await { Ok(Some(NetworkOutMsg::Publish { data, .. })) => { // Could be announcement or sync request. - if let Ok(msg) = bincode::deserialize::(&data) - { + if let Ok(msg) = bincode::deserialize::(&data) { match msg { willow_common::WorkerWireMessage::Announcement(_) => { announcement_count += 1;