Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 75 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
name: CI

on:
push:
branches: [main]
pull_request:
branches: [main]
workflow_call:

env:
CARGO_TERM_COLOR: always
RUSTFLAGS: -D warnings

jobs:
fmt:
name: Format
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- uses: dtolnay/rust-toolchain@stable
with:
components: rustfmt
- run: cargo fmt --check

clippy:
name: Clippy
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- uses: dtolnay/rust-toolchain@stable
with:
components: clippy
- uses: actions/cache@v4
with:
path: |
~/.cargo/registry
~/.cargo/git
target
key: clippy-${{ hashFiles('**/Cargo.lock') }}
restore-keys: clippy-
- run: cargo clippy --workspace -- -D warnings

test:
name: Test
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- uses: dtolnay/rust-toolchain@stable
- uses: actions/cache@v4
with:
path: |
~/.cargo/registry
~/.cargo/git
target
key: test-${{ hashFiles('**/Cargo.lock') }}
restore-keys: test-
- run: cargo test --workspace

wasm:
name: WASM Check
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- uses: dtolnay/rust-toolchain@stable
with:
targets: wasm32-unknown-unknown
- uses: actions/cache@v4
with:
path: |
~/.cargo/registry
~/.cargo/git
target
key: wasm-${{ hashFiles('**/Cargo.lock') }}
restore-keys: wasm-
- run: cargo check --target wasm32-unknown-unknown --workspace --exclude willow-relay --exclude willow-worker --exclude willow-replay --exclude willow-storage
9 changes: 9 additions & 0 deletions .github/workflows/deploy.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,17 @@ on:
push:
branches: [main]

env:
CARGO_TERM_COLOR: always

jobs:
ci:
name: CI
uses: ./.github/workflows/ci.yml

deploy:
name: Deploy
needs: ci
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
Expand Down
11 changes: 4 additions & 7 deletions crates/client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -786,9 +786,9 @@ impl ClientHandle {
}
// Broadcast after releasing the borrow.
for event in events_to_broadcast {
let _ = self.cmd_tx.unbounded_send(
network::NetworkCommand::BroadcastEvent { event, topic: None },
);
let _ = self
.cmd_tx
.unbounded_send(network::NetworkCommand::BroadcastEvent { event, topic: None });
}
}

Expand Down Expand Up @@ -4399,10 +4399,7 @@ mod tests {
let (client, _rx) = test_client();
client.create_server("Worker Test").unwrap();

client.authorize_workers(&[
"worker-peer-1".to_string(),
"worker-peer-2".to_string(),
]);
client.authorize_workers(&["worker-peer-1".to_string(), "worker-peer-2".to_string()]);

let shared = client.shared.borrow();
assert!(shared
Expand Down
4 changes: 1 addition & 3 deletions crates/client/src/ops.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,7 @@
use serde::{Deserialize, Serialize};

// Re-export wire types from willow-common so existing imports still work.
pub use willow_common::{
pack_wire, unpack_wire, VoiceSignalPayload, WireMessage,
};
pub use willow_common::{pack_wire, unpack_wire, VoiceSignalPayload, WireMessage};

// ───── Join link types ──────────────────────────────────────────────────────

Expand Down
5 changes: 4 additions & 1 deletion crates/client/src/worker_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,10 @@ mod tests {
#[test]
fn worker_serving_multiple_servers() {
let mut cache = WorkerCache::new(Duration::from_secs(30));
cache.update(&make_replay_announcement("r1", vec!["srv-1", "srv-2", "srv-3"]));
cache.update(&make_replay_announcement(
"r1",
vec!["srv-1", "srv-2", "srv-3"],
));

assert_eq!(cache.replay_workers_for_server("srv-1").len(), 1);
assert_eq!(cache.replay_workers_for_server("srv-2").len(), 1);
Expand Down
17 changes: 6 additions & 11 deletions crates/common/src/worker_types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ pub enum WorkerWireMessage {
Response {
request_id: String,
target_peer: String,
payload: WorkerResponse,
payload: Box<WorkerResponse>,
},
}

Expand Down Expand Up @@ -98,7 +98,7 @@ pub enum WorkerResponse {
SyncBatch { events: Vec<Event> },

/// Full state snapshot for far-behind peers.
Snapshot { state: ServerState },
Snapshot { state: Box<ServerState> },

/// Paginated history results.
HistoryPage { events: Vec<Event>, has_more: bool },
Expand All @@ -123,22 +123,17 @@ pub trait WorkerRole: Send + 'static {
}

/// Allocation strategy for which servers a worker serves.
#[derive(Debug, Clone, Serialize, Deserialize)]
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub enum AllocationStrategy {
/// Serve all discovered servers (initial implementation).
#[default]
Global,
/// Serve only specific servers (future).
PerServer(Vec<String>),
/// Dynamic allocation based on load (future).
Dynamic,
}

impl Default for AllocationStrategy {
fn default() -> Self {
AllocationStrategy::Global
}
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down Expand Up @@ -366,9 +361,9 @@ mod tests {
let msg = WorkerWireMessage::Response {
request_id: "req-456".to_string(),
target_peer: "client-peer".to_string(),
payload: WorkerResponse::Denied {
payload: Box::new(WorkerResponse::Denied {
reason: "unknown server".to_string(),
},
}),
};
let bytes = bincode::serialize(&msg).unwrap();
let decoded: WorkerWireMessage = bincode::deserialize(&bytes).unwrap();
Expand Down
3 changes: 1 addition & 2 deletions crates/replay/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,7 @@ struct Cli {
async fn main() -> anyhow::Result<()> {
tracing_subscriber::fmt()
.with_env_filter(
tracing_subscriber::EnvFilter::try_from_default_env()
.unwrap_or_else(|_| "info".into()),
tracing_subscriber::EnvFilter::try_from_default_env().unwrap_or_else(|_| "info".into()),
)
.init();

Expand Down
8 changes: 2 additions & 6 deletions crates/replay/src/role.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,11 +98,7 @@ impl ReplayRole {

impl WorkerRole for ReplayRole {
fn role_info(&self) -> WorkerRoleInfo {
let total_events: u32 = self
.servers
.values()
.map(|s| s.events.len() as u32)
.sum();
let total_events: u32 = self.servers.values().map(|s| s.events.len() as u32).sum();
WorkerRoleInfo::Replay {
servers_loaded: self.servers.len() as u32,
events_buffered: total_events,
Expand All @@ -128,7 +124,7 @@ impl WorkerRole for ReplayRole {
// Client is too far behind — send full snapshot.
match self.servers.get(&server_id) {
Some(data) => WorkerResponse::Snapshot {
state: data.state.clone(),
state: Box::new(data.state.clone()),
},
None => WorkerResponse::Denied {
reason: format!("unknown server: {server_id}"),
Expand Down
3 changes: 1 addition & 2 deletions crates/storage/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,7 @@ struct Cli {
async fn main() -> anyhow::Result<()> {
tracing_subscriber::fmt()
.with_env_filter(
tracing_subscriber::EnvFilter::try_from_default_env()
.unwrap_or_else(|_| "info".into()),
tracing_subscriber::EnvFilter::try_from_default_env().unwrap_or_else(|_| "info".into()),
)
.init();

Expand Down
8 changes: 6 additions & 2 deletions crates/storage/src/role.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,10 @@ impl WorkerRole for StorageRole {
channel,
before_timestamp,
limit,
} => match self.store.history(&server_id, &channel, before_timestamp, limit) {
} => match self
.store
.history(&server_id, &channel, before_timestamp, limit)
{
Ok((events, has_more)) => WorkerResponse::HistoryPage { events, has_more },
Err(e) => WorkerResponse::Denied {
reason: format!("query failed: {e}"),
Expand Down Expand Up @@ -173,7 +176,8 @@ mod tests {

match role.role_info() {
WorkerRoleInfo::Storage {
total_events_stored, ..
total_events_stored,
..
} => assert_eq!(total_events_stored, 1),
_ => panic!("expected Storage"),
}
Expand Down
56 changes: 40 additions & 16 deletions crates/storage/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,11 +129,11 @@ impl StorageEventStore {

/// Number of distinct servers tracked.
pub fn server_count(&self) -> anyhow::Result<u32> {
let count: i64 = self.conn.query_row(
"SELECT COUNT(DISTINCT server_id) FROM events",
[],
|row| row.get(0),
)?;
let count: i64 =
self.conn
.query_row("SELECT COUNT(DISTINCT server_id) FROM events", [], |row| {
row.get(0)
})?;
Ok(count as u32)
}
}
Expand Down Expand Up @@ -193,7 +193,10 @@ mod tests {
let store = StorageEventStore::open(":memory:").unwrap();
for i in 0..5u64 {
store
.store_event("srv-1", &make_message(&format!("e{i}"), "general", (i + 1) * 1000))
.store_event(
"srv-1",
&make_message(&format!("e{i}"), "general", (i + 1) * 1000),
)
.unwrap();
}

Expand All @@ -210,7 +213,10 @@ mod tests {
let store = StorageEventStore::open(":memory:").unwrap();
for i in 0..10u64 {
store
.store_event("srv-1", &make_message(&format!("e{i}"), "general", (i + 1) * 1000))
.store_event(
"srv-1",
&make_message(&format!("e{i}"), "general", (i + 1) * 1000),
)
.unwrap();
}

Expand Down Expand Up @@ -239,9 +245,15 @@ mod tests {
#[test]
fn history_filters_by_channel() {
let store = StorageEventStore::open(":memory:").unwrap();
store.store_event("srv-1", &make_message("e1", "general", 1000)).unwrap();
store.store_event("srv-1", &make_message("e2", "random", 2000)).unwrap();
store.store_event("srv-1", &make_message("e3", "general", 3000)).unwrap();
store
.store_event("srv-1", &make_message("e1", "general", 1000))
.unwrap();
store
.store_event("srv-1", &make_message("e2", "random", 2000))
.unwrap();
store
.store_event("srv-1", &make_message("e3", "general", 3000))
.unwrap();

let (events, _) = store.history("srv-1", "general", None, 10).unwrap();
assert_eq!(events.len(), 2);
Expand All @@ -250,8 +262,12 @@ mod tests {
#[test]
fn history_filters_by_server() {
let store = StorageEventStore::open(":memory:").unwrap();
store.store_event("srv-1", &make_message("e1", "general", 1000)).unwrap();
store.store_event("srv-2", &make_message("e2", "general", 2000)).unwrap();
store
.store_event("srv-1", &make_message("e1", "general", 1000))
.unwrap();
store
.store_event("srv-2", &make_message("e2", "general", 2000))
.unwrap();

let (events, _) = store.history("srv-1", "general", None, 10).unwrap();
assert_eq!(events.len(), 1);
Expand All @@ -261,17 +277,25 @@ mod tests {
#[test]
fn server_count_tracks_distinct_servers() {
let store = StorageEventStore::open(":memory:").unwrap();
store.store_event("srv-1", &make_message("e1", "general", 1000)).unwrap();
store.store_event("srv-2", &make_message("e2", "general", 2000)).unwrap();
store.store_event("srv-1", &make_message("e3", "general", 3000)).unwrap();
store
.store_event("srv-1", &make_message("e1", "general", 1000))
.unwrap();
store
.store_event("srv-2", &make_message("e2", "general", 2000))
.unwrap();
store
.store_event("srv-1", &make_message("e3", "general", 3000))
.unwrap();

assert_eq!(store.server_count().unwrap(), 2);
}

#[test]
fn disk_usage_returns_nonzero_after_insert() {
let store = StorageEventStore::open(":memory:").unwrap();
store.store_event("srv-1", &make_message("e1", "general", 1000)).unwrap();
store
.store_event("srv-1", &make_message("e1", "general", 1000))
.unwrap();
assert!(store.disk_usage_bytes().unwrap() > 0);
}

Expand Down
4 changes: 1 addition & 3 deletions crates/web/src/icons.rs
Original file line number Diff line number Diff line change
Expand Up @@ -372,9 +372,7 @@ pub fn icon_database() -> impl IntoView {
/// Activity/pulse icon (heartbeat line).
pub fn icon_activity() -> impl IntoView {
icon(
&format!(
r#"<svg {SVG_ATTRS}><polyline points="22 12 18 12 15 21 9 3 6 12 2 12"/></svg>"#
),
&format!(r#"<svg {SVG_ATTRS}><polyline points="22 12 18 12 15 21 9 3 6 12 2 12"/></svg>"#),
"icon-activity",
)
}
Loading
Loading