From 6f193c2d3b5d27018e26e6dabf9a786e8c180de8 Mon Sep 17 00:00:00 2001 From: Claude Date: Sun, 29 Mar 2026 10:28:01 +0000 Subject: [PATCH 1/2] ci: add CI pipeline and gate deploy on passing checks Add a CI workflow (fmt, clippy, test, WASM check) that runs on PRs and pushes to main. Update the deploy workflow to depend on CI passing before deploying. Fix formatting issues across the workspace. https://claude.ai/code/session_019cmP1WsNbaf7AYLH7Naxgm --- .github/workflows/ci.yml | 75 +++++++++++++++++++++++++++++ .github/workflows/deploy.yml | 9 ++++ crates/client/src/lib.rs | 11 ++--- crates/client/src/ops.rs | 4 +- crates/client/src/worker_cache.rs | 5 +- crates/replay/src/main.rs | 3 +- crates/replay/src/role.rs | 6 +-- crates/storage/src/main.rs | 3 +- crates/storage/src/role.rs | 8 ++- crates/storage/src/store.rs | 56 +++++++++++++++------ crates/web/src/icons.rs | 4 +- crates/worker/src/actors/network.rs | 42 +++++++--------- crates/worker/tests/integration.rs | 32 +++++++----- 13 files changed, 179 insertions(+), 79 deletions(-) create mode 100644 .github/workflows/ci.yml diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml new file mode 100644 index 00000000..fc2df694 --- /dev/null +++ b/.github/workflows/ci.yml @@ -0,0 +1,75 @@ +name: CI + +on: + push: + branches: [main] + pull_request: + branches: [main] + workflow_call: + +env: + CARGO_TERM_COLOR: always + RUSTFLAGS: -D warnings + +jobs: + fmt: + name: Format + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + - uses: dtolnay/rust-toolchain@stable + with: + components: rustfmt + - run: cargo fmt --check + + clippy: + name: Clippy + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + - uses: dtolnay/rust-toolchain@stable + with: + components: clippy + - uses: actions/cache@v4 + with: + path: | + ~/.cargo/registry + ~/.cargo/git + target + key: clippy-${{ hashFiles('**/Cargo.lock') }} + restore-keys: clippy- + - run: cargo clippy --workspace -- -D warnings + + test: + name: Test + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + - uses: dtolnay/rust-toolchain@stable + - uses: actions/cache@v4 + with: + path: | + ~/.cargo/registry + ~/.cargo/git + target + key: test-${{ hashFiles('**/Cargo.lock') }} + restore-keys: test- + - run: cargo test --workspace + + wasm: + name: WASM Check + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + - uses: dtolnay/rust-toolchain@stable + with: + targets: wasm32-unknown-unknown + - uses: actions/cache@v4 + with: + path: | + ~/.cargo/registry + ~/.cargo/git + target + key: wasm-${{ hashFiles('**/Cargo.lock') }} + restore-keys: wasm- + - run: cargo check --target wasm32-unknown-unknown --workspace --exclude willow-relay --exclude willow-worker --exclude willow-replay --exclude willow-storage diff --git a/.github/workflows/deploy.yml b/.github/workflows/deploy.yml index 4d6ea40b..db628077 100644 --- a/.github/workflows/deploy.yml +++ b/.github/workflows/deploy.yml @@ -4,8 +4,17 @@ on: push: branches: [main] +env: + CARGO_TERM_COLOR: always + jobs: + ci: + name: CI + uses: ./.github/workflows/ci.yml + deploy: + name: Deploy + needs: ci runs-on: ubuntu-latest steps: - uses: actions/checkout@v4 diff --git a/crates/client/src/lib.rs b/crates/client/src/lib.rs index c15706d1..6454eec8 100644 --- a/crates/client/src/lib.rs +++ b/crates/client/src/lib.rs @@ -786,9 +786,9 @@ impl ClientHandle { } // Broadcast after releasing the borrow. for event in events_to_broadcast { - let _ = self.cmd_tx.unbounded_send( - network::NetworkCommand::BroadcastEvent { event, topic: None }, - ); + let _ = self + .cmd_tx + .unbounded_send(network::NetworkCommand::BroadcastEvent { event, topic: None }); } } @@ -4399,10 +4399,7 @@ mod tests { let (client, _rx) = test_client(); client.create_server("Worker Test").unwrap(); - client.authorize_workers(&[ - "worker-peer-1".to_string(), - "worker-peer-2".to_string(), - ]); + client.authorize_workers(&["worker-peer-1".to_string(), "worker-peer-2".to_string()]); let shared = client.shared.borrow(); assert!(shared diff --git a/crates/client/src/ops.rs b/crates/client/src/ops.rs index faf461c2..59456136 100644 --- a/crates/client/src/ops.rs +++ b/crates/client/src/ops.rs @@ -17,9 +17,7 @@ use serde::{Deserialize, Serialize}; // Re-export wire types from willow-common so existing imports still work. -pub use willow_common::{ - pack_wire, unpack_wire, VoiceSignalPayload, WireMessage, -}; +pub use willow_common::{pack_wire, unpack_wire, VoiceSignalPayload, WireMessage}; // ───── Join link types ────────────────────────────────────────────────────── diff --git a/crates/client/src/worker_cache.rs b/crates/client/src/worker_cache.rs index fff4a5db..7818f5c6 100644 --- a/crates/client/src/worker_cache.rs +++ b/crates/client/src/worker_cache.rs @@ -223,7 +223,10 @@ mod tests { #[test] fn worker_serving_multiple_servers() { let mut cache = WorkerCache::new(Duration::from_secs(30)); - cache.update(&make_replay_announcement("r1", vec!["srv-1", "srv-2", "srv-3"])); + cache.update(&make_replay_announcement( + "r1", + vec!["srv-1", "srv-2", "srv-3"], + )); assert_eq!(cache.replay_workers_for_server("srv-1").len(), 1); assert_eq!(cache.replay_workers_for_server("srv-2").len(), 1); diff --git a/crates/replay/src/main.rs b/crates/replay/src/main.rs index c65f2780..76e8b2e5 100644 --- a/crates/replay/src/main.rs +++ b/crates/replay/src/main.rs @@ -37,8 +37,7 @@ struct Cli { async fn main() -> anyhow::Result<()> { tracing_subscriber::fmt() .with_env_filter( - tracing_subscriber::EnvFilter::try_from_default_env() - .unwrap_or_else(|_| "info".into()), + tracing_subscriber::EnvFilter::try_from_default_env().unwrap_or_else(|_| "info".into()), ) .init(); diff --git a/crates/replay/src/role.rs b/crates/replay/src/role.rs index cfc6e524..40aaaf40 100644 --- a/crates/replay/src/role.rs +++ b/crates/replay/src/role.rs @@ -98,11 +98,7 @@ impl ReplayRole { impl WorkerRole for ReplayRole { fn role_info(&self) -> WorkerRoleInfo { - let total_events: u32 = self - .servers - .values() - .map(|s| s.events.len() as u32) - .sum(); + let total_events: u32 = self.servers.values().map(|s| s.events.len() as u32).sum(); WorkerRoleInfo::Replay { servers_loaded: self.servers.len() as u32, events_buffered: total_events, diff --git a/crates/storage/src/main.rs b/crates/storage/src/main.rs index 298c0b07..85d325d0 100644 --- a/crates/storage/src/main.rs +++ b/crates/storage/src/main.rs @@ -39,8 +39,7 @@ struct Cli { async fn main() -> anyhow::Result<()> { tracing_subscriber::fmt() .with_env_filter( - tracing_subscriber::EnvFilter::try_from_default_env() - .unwrap_or_else(|_| "info".into()), + tracing_subscriber::EnvFilter::try_from_default_env().unwrap_or_else(|_| "info".into()), ) .init(); diff --git a/crates/storage/src/role.rs b/crates/storage/src/role.rs index d2876096..79f98f74 100644 --- a/crates/storage/src/role.rs +++ b/crates/storage/src/role.rs @@ -65,7 +65,10 @@ impl WorkerRole for StorageRole { channel, before_timestamp, limit, - } => match self.store.history(&server_id, &channel, before_timestamp, limit) { + } => match self + .store + .history(&server_id, &channel, before_timestamp, limit) + { Ok((events, has_more)) => WorkerResponse::HistoryPage { events, has_more }, Err(e) => WorkerResponse::Denied { reason: format!("query failed: {e}"), @@ -173,7 +176,8 @@ mod tests { match role.role_info() { WorkerRoleInfo::Storage { - total_events_stored, .. + total_events_stored, + .. } => assert_eq!(total_events_stored, 1), _ => panic!("expected Storage"), } diff --git a/crates/storage/src/store.rs b/crates/storage/src/store.rs index 0135172f..8bd3606b 100644 --- a/crates/storage/src/store.rs +++ b/crates/storage/src/store.rs @@ -129,11 +129,11 @@ impl StorageEventStore { /// Number of distinct servers tracked. pub fn server_count(&self) -> anyhow::Result { - let count: i64 = self.conn.query_row( - "SELECT COUNT(DISTINCT server_id) FROM events", - [], - |row| row.get(0), - )?; + let count: i64 = + self.conn + .query_row("SELECT COUNT(DISTINCT server_id) FROM events", [], |row| { + row.get(0) + })?; Ok(count as u32) } } @@ -193,7 +193,10 @@ mod tests { let store = StorageEventStore::open(":memory:").unwrap(); for i in 0..5u64 { store - .store_event("srv-1", &make_message(&format!("e{i}"), "general", (i + 1) * 1000)) + .store_event( + "srv-1", + &make_message(&format!("e{i}"), "general", (i + 1) * 1000), + ) .unwrap(); } @@ -210,7 +213,10 @@ mod tests { let store = StorageEventStore::open(":memory:").unwrap(); for i in 0..10u64 { store - .store_event("srv-1", &make_message(&format!("e{i}"), "general", (i + 1) * 1000)) + .store_event( + "srv-1", + &make_message(&format!("e{i}"), "general", (i + 1) * 1000), + ) .unwrap(); } @@ -239,9 +245,15 @@ mod tests { #[test] fn history_filters_by_channel() { let store = StorageEventStore::open(":memory:").unwrap(); - store.store_event("srv-1", &make_message("e1", "general", 1000)).unwrap(); - store.store_event("srv-1", &make_message("e2", "random", 2000)).unwrap(); - store.store_event("srv-1", &make_message("e3", "general", 3000)).unwrap(); + store + .store_event("srv-1", &make_message("e1", "general", 1000)) + .unwrap(); + store + .store_event("srv-1", &make_message("e2", "random", 2000)) + .unwrap(); + store + .store_event("srv-1", &make_message("e3", "general", 3000)) + .unwrap(); let (events, _) = store.history("srv-1", "general", None, 10).unwrap(); assert_eq!(events.len(), 2); @@ -250,8 +262,12 @@ mod tests { #[test] fn history_filters_by_server() { let store = StorageEventStore::open(":memory:").unwrap(); - store.store_event("srv-1", &make_message("e1", "general", 1000)).unwrap(); - store.store_event("srv-2", &make_message("e2", "general", 2000)).unwrap(); + store + .store_event("srv-1", &make_message("e1", "general", 1000)) + .unwrap(); + store + .store_event("srv-2", &make_message("e2", "general", 2000)) + .unwrap(); let (events, _) = store.history("srv-1", "general", None, 10).unwrap(); assert_eq!(events.len(), 1); @@ -261,9 +277,15 @@ mod tests { #[test] fn server_count_tracks_distinct_servers() { let store = StorageEventStore::open(":memory:").unwrap(); - store.store_event("srv-1", &make_message("e1", "general", 1000)).unwrap(); - store.store_event("srv-2", &make_message("e2", "general", 2000)).unwrap(); - store.store_event("srv-1", &make_message("e3", "general", 3000)).unwrap(); + store + .store_event("srv-1", &make_message("e1", "general", 1000)) + .unwrap(); + store + .store_event("srv-2", &make_message("e2", "general", 2000)) + .unwrap(); + store + .store_event("srv-1", &make_message("e3", "general", 3000)) + .unwrap(); assert_eq!(store.server_count().unwrap(), 2); } @@ -271,7 +293,9 @@ mod tests { #[test] fn disk_usage_returns_nonzero_after_insert() { let store = StorageEventStore::open(":memory:").unwrap(); - store.store_event("srv-1", &make_message("e1", "general", 1000)).unwrap(); + store + .store_event("srv-1", &make_message("e1", "general", 1000)) + .unwrap(); assert!(store.disk_usage_bytes().unwrap() > 0); } diff --git a/crates/web/src/icons.rs b/crates/web/src/icons.rs index 0ed2b6dd..9e9c67f0 100644 --- a/crates/web/src/icons.rs +++ b/crates/web/src/icons.rs @@ -372,9 +372,7 @@ pub fn icon_database() -> impl IntoView { /// Activity/pulse icon (heartbeat line). pub fn icon_activity() -> impl IntoView { icon( - &format!( - r#""# - ), + &format!(r#""#), "icon-activity", ) } diff --git a/crates/worker/src/actors/network.rs b/crates/worker/src/actors/network.rs index 145a9c06..c372d54a 100644 --- a/crates/worker/src/actors/network.rs +++ b/crates/worker/src/actors/network.rs @@ -134,9 +134,7 @@ pub fn parse_server_message(data: &[u8]) -> ServerMessageAction { if let Some((wire_msg, _signer)) = willow_common::unpack_wire(data) { match wire_msg { willow_common::WireMessage::Event(event) => ServerMessageAction::Events(vec![event]), - willow_common::WireMessage::SyncBatch { events } => { - ServerMessageAction::Events(events) - } + willow_common::WireMessage::SyncBatch { events } => ServerMessageAction::Events(events), _ => ServerMessageAction::Ignore, } } else { @@ -170,22 +168,18 @@ async fn handle_incoming_message( return; } - let resp = match tokio::time::timeout( - std::time::Duration::from_secs(5), - reply_rx, - ) - .await - { - Ok(Ok(resp)) => resp, - Ok(Err(_)) => { - warn!(%request_id, "state actor dropped reply channel"); - return; - } - Err(_) => { - warn!(%request_id, "request timed out after 5s"); - return; - } - }; + let resp = + match tokio::time::timeout(std::time::Duration::from_secs(5), reply_rx).await { + Ok(Ok(resp)) => resp, + Ok(Err(_)) => { + warn!(%request_id, "state actor dropped reply channel"); + return; + } + Err(_) => { + warn!(%request_id, "request timed out after 5s"); + return; + } + }; let response_msg = WorkerWireMessage::Response { request_id: request_id.clone(), @@ -362,9 +356,8 @@ mod tests { }, }; - let data = - willow_common::pack_wire(&willow_common::WireMessage::Event(event.clone()), &id) - .unwrap(); + let data = willow_common::pack_wire(&willow_common::WireMessage::Event(event.clone()), &id) + .unwrap(); match parse_server_message(&data) { ServerMessageAction::Events(events) => { @@ -403,9 +396,8 @@ mod tests { }, ]; - let data = - willow_common::pack_wire(&willow_common::WireMessage::SyncBatch { events }, &id) - .unwrap(); + let data = willow_common::pack_wire(&willow_common::WireMessage::SyncBatch { events }, &id) + .unwrap(); match parse_server_message(&data) { ServerMessageAction::Events(events) => assert_eq!(events.len(), 2), diff --git a/crates/worker/tests/integration.rs b/crates/worker/tests/integration.rs index 7e651ea3..c381f0f2 100644 --- a/crates/worker/tests/integration.rs +++ b/crates/worker/tests/integration.rs @@ -88,9 +88,12 @@ async fn state_actor_with_replay_role_full_flow() { // 1. Ingest 5 events. for i in 0..5u64 { - tx.send(StateMsg::Event(make_message(&format!("e{i}"), (i + 1) * 1000))) - .await - .unwrap(); + tx.send(StateMsg::Event(make_message( + &format!("e{i}"), + (i + 1) * 1000, + ))) + .await + .unwrap(); } // 2. Verify role info shows 5 buffered events. @@ -170,8 +173,7 @@ async fn heartbeat_and_state_actor_interaction() { match msg { NetworkOutMsg::Publish { data, .. } => { - let decoded: willow_common::WorkerWireMessage = - bincode::deserialize(&data).unwrap(); + let decoded: willow_common::WorkerWireMessage = bincode::deserialize(&data).unwrap(); match decoded { willow_common::WorkerWireMessage::Announcement(a) => { assert_eq!(a.peer_id, "test-worker"); @@ -238,9 +240,12 @@ async fn events_applied_then_queried_via_request() { // Ingest 10 events into a buffer of size 5. for i in 0..10u64 { - tx.send(StateMsg::Event(make_message(&format!("e{i}"), (i + 1) * 1000))) - .await - .unwrap(); + tx.send(StateMsg::Event(make_message( + &format!("e{i}"), + (i + 1) * 1000, + ))) + .await + .unwrap(); } // Query — should only get 5 (buffer evicted oldest). @@ -294,8 +299,7 @@ async fn graceful_shutdown_sends_departure() { match msg { NetworkOutMsg::Publish { data, .. } => { - let decoded: willow_common::WorkerWireMessage = - bincode::deserialize(&data).unwrap(); + let decoded: willow_common::WorkerWireMessage = bincode::deserialize(&data).unwrap(); match decoded { willow_common::WorkerWireMessage::Departure { peer_id } => { assert_eq!(peer_id, "departing-worker"); @@ -344,7 +348,10 @@ async fn full_actor_orchestration_without_network() { // Ingest some events. for i in 0..3u64 { state_tx - .send(StateMsg::Event(make_message(&format!("orch-{i}"), (i + 1) * 1000))) + .send(StateMsg::Event(make_message( + &format!("orch-{i}"), + (i + 1) * 1000, + ))) .await .unwrap(); } @@ -356,8 +363,7 @@ async fn full_actor_orchestration_without_network() { match tokio::time::timeout(Duration::from_millis(30), network_rx.recv()).await { Ok(Some(NetworkOutMsg::Publish { data, .. })) => { // Could be announcement or sync request. - if let Ok(msg) = bincode::deserialize::(&data) - { + if let Ok(msg) = bincode::deserialize::(&data) { match msg { willow_common::WorkerWireMessage::Announcement(_) => { announcement_count += 1; From 2ddd2f6c56ef61b776bb9e6d283f947aaf6cd327 Mon Sep 17 00:00:00 2001 From: Claude Date: Sun, 29 Mar 2026 10:32:14 +0000 Subject: [PATCH 2/2] fix: resolve clippy errors in willow-common for CI Box large enum variants (WorkerResponse in WorkerWireMessage::Response, ServerState in WorkerResponse::Snapshot) and derive Default for AllocationStrategy to satisfy clippy -D warnings. https://claude.ai/code/session_019cmP1WsNbaf7AYLH7Naxgm --- crates/common/src/worker_types.rs | 17 ++++++----------- crates/replay/src/role.rs | 2 +- crates/worker/src/actors/network.rs | 6 +++--- crates/worker/tests/integration.rs | 2 +- 4 files changed, 11 insertions(+), 16 deletions(-) diff --git a/crates/common/src/worker_types.rs b/crates/common/src/worker_types.rs index a1568cb8..2cb77190 100644 --- a/crates/common/src/worker_types.rs +++ b/crates/common/src/worker_types.rs @@ -69,7 +69,7 @@ pub enum WorkerWireMessage { Response { request_id: String, target_peer: String, - payload: WorkerResponse, + payload: Box, }, } @@ -98,7 +98,7 @@ pub enum WorkerResponse { SyncBatch { events: Vec }, /// Full state snapshot for far-behind peers. - Snapshot { state: ServerState }, + Snapshot { state: Box }, /// Paginated history results. HistoryPage { events: Vec, has_more: bool }, @@ -123,9 +123,10 @@ pub trait WorkerRole: Send + 'static { } /// Allocation strategy for which servers a worker serves. -#[derive(Debug, Clone, Serialize, Deserialize)] +#[derive(Debug, Clone, Default, Serialize, Deserialize)] pub enum AllocationStrategy { /// Serve all discovered servers (initial implementation). + #[default] Global, /// Serve only specific servers (future). PerServer(Vec), @@ -133,12 +134,6 @@ pub enum AllocationStrategy { Dynamic, } -impl Default for AllocationStrategy { - fn default() -> Self { - AllocationStrategy::Global - } -} - #[cfg(test)] mod tests { use super::*; @@ -366,9 +361,9 @@ mod tests { let msg = WorkerWireMessage::Response { request_id: "req-456".to_string(), target_peer: "client-peer".to_string(), - payload: WorkerResponse::Denied { + payload: Box::new(WorkerResponse::Denied { reason: "unknown server".to_string(), - }, + }), }; let bytes = bincode::serialize(&msg).unwrap(); let decoded: WorkerWireMessage = bincode::deserialize(&bytes).unwrap(); diff --git a/crates/replay/src/role.rs b/crates/replay/src/role.rs index 40aaaf40..fdd9bf0a 100644 --- a/crates/replay/src/role.rs +++ b/crates/replay/src/role.rs @@ -124,7 +124,7 @@ impl WorkerRole for ReplayRole { // Client is too far behind — send full snapshot. match self.servers.get(&server_id) { Some(data) => WorkerResponse::Snapshot { - state: data.state.clone(), + state: Box::new(data.state.clone()), }, None => WorkerResponse::Denied { reason: format!("unknown server: {server_id}"), diff --git a/crates/worker/src/actors/network.rs b/crates/worker/src/actors/network.rs index c372d54a..3675ce49 100644 --- a/crates/worker/src/actors/network.rs +++ b/crates/worker/src/actors/network.rs @@ -184,7 +184,7 @@ async fn handle_incoming_message( let response_msg = WorkerWireMessage::Response { request_id: request_id.clone(), target_peer: String::new(), - payload: resp, + payload: Box::new(resp), }; if let Ok(bytes) = bincode::serialize(&response_msg) { if let Err(e) = node.publish(WORKERS_TOPIC, bytes) { @@ -313,9 +313,9 @@ mod tests { let msg = WorkerWireMessage::Response { request_id: "r1".to_string(), target_peer: "my-peer".to_string(), - payload: WorkerResponse::Denied { + payload: Box::new(WorkerResponse::Denied { reason: "test".to_string(), - }, + }), }; let data = bincode::serialize(&msg).unwrap(); diff --git a/crates/worker/tests/integration.rs b/crates/worker/tests/integration.rs index c381f0f2..fd67770e 100644 --- a/crates/worker/tests/integration.rs +++ b/crates/worker/tests/integration.rs @@ -54,7 +54,7 @@ impl WorkerRole for TestReplayRole { } } else { WorkerResponse::Snapshot { - state: self.state.clone(), + state: Box::new(self.state.clone()), } } }