diff --git a/Cargo.lock b/Cargo.lock index 417a60e6..6a6b614a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6001,9 +6001,11 @@ dependencies = [ "futures", "gloo-timers", "js-sys", + "parking_lot", "rusqlite", "serde", "tempfile", + "thiserror 1.0.69", "tokio", "tracing", "uuid", diff --git a/Cargo.toml b/Cargo.toml index d1845ecd..3c77010f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -36,3 +36,6 @@ uuid = { version = "1", features = ["v4", "serde", "js"] } # Logging tracing = "0.1" tracing-subscriber = { version = "0.3", features = ["env-filter"] } + +# Synchronization primitives (poison-free, WASM-compatible) +parking_lot = "0.12" diff --git a/crates/client/Cargo.toml b/crates/client/Cargo.toml index 76fbfccc..11b33e73 100644 --- a/crates/client/Cargo.toml +++ b/crates/client/Cargo.toml @@ -22,8 +22,10 @@ willow-common = { path = "../common" } anyhow = { workspace = true } bytes = { workspace = true } serde = { workspace = true } +thiserror = { workspace = true } uuid = { workspace = true } tracing = { workspace = true } +parking_lot = { workspace = true } futures = "0.3" [target.'cfg(not(target_arch = "wasm32"))'.dependencies] diff --git a/crates/client/src/joining.rs b/crates/client/src/joining.rs index 0f1e5749..7503e108 100644 --- a/crates/client/src/joining.rs +++ b/crates/client/src/joining.rs @@ -48,10 +48,19 @@ impl ClientHandle { .next() .map(|(name, _)| name.clone()); + // Validate the server id BEFORE we touch any actor state. If the + // invite is malformed we want to surface a typed error to the + // caller instead of silently inventing a fresh server id, which + // would split-brain the joiner from the rest of the network + // (issue #115). + let parsed_server_uuid = uuid::Uuid::parse_str(&server_id).map_err(|e| { + crate::ClientError::MalformedInvite(format!("invalid server_id `{server_id}`: {e}")) + })?; + // Update server registry. let channel_topics = willow_actor::state::mutate( &self.server_registry_addr, - move |reg| -> anyhow::Result> { + move |reg| -> Result, crate::ClientError> { if let Some(entry) = reg.servers.get_mut(&server_id) { for (topic, (name, key)) in &accepted.channel_keys { entry.keys.insert(topic.clone(), key.clone()); @@ -63,11 +72,8 @@ impl ClientHandle { } } } else { - let parsed_id = willow_channel::ServerId( - uuid::Uuid::parse_str(&server_id).unwrap_or_else(|_| uuid::Uuid::new_v4()), - ); let mut server = willow_channel::Server::with_id( - parsed_id, + willow_channel::ServerId(parsed_server_uuid), &accepted.server_name, accepted.genesis_author, ); @@ -76,7 +82,11 @@ impl ClientHandle { for (topic, (name, key)) in &accepted.channel_keys { let ch_id = server .create_channel(name, willow_channel::ChannelKind::Text) - .unwrap_or_else(|_| willow_channel::ChannelId::new()); + .map_err(|e| { + crate::ClientError::MalformedInvite(format!( + "could not create channel `{name}` from invite: {e}" + )) + })?; server.set_channel_key(ch_id.clone(), key.clone()); keys.insert(topic.clone(), key.clone()); topic_map.insert(topic.clone(), (name.clone(), ch_id)); @@ -206,20 +216,17 @@ impl ClientHandle { server_name, inviter_name, }; - self.join_links.lock().unwrap().push(link); + self.join_links.lock().push(link); Ok(token.encode()) } pub async fn join_links(&self) -> Vec { - self.join_links.lock().unwrap().clone() + self.join_links.lock().clone() } pub async fn delete_join_link(&self, link_id: &str) { let link_id = link_id.to_string(); - self.join_links - .lock() - .unwrap() - .retain(|l| l.link_id != link_id); + self.join_links.lock().retain(|l| l.link_id != link_id); } pub async fn set_display_name(&self, name: &str) { diff --git a/crates/client/src/lib.rs b/crates/client/src/lib.rs index 25b39b80..f274ce12 100644 --- a/crates/client/src/lib.rs +++ b/crates/client/src/lib.rs @@ -47,6 +47,22 @@ pub use event_receiver::EventReceiver; pub use events::ClientEvent; pub use ops::{pack_wire, unpack_wire, VoiceSignalPayload, WireMessage}; +/// Errors returned by client API entry points. +/// +/// New variants are added as the crate's surface grows. For now this +/// covers the few cases where we explicitly want to surface a typed +/// failure to callers (rather than swallowing it via `anyhow`). +#[derive(Debug, thiserror::Error)] +pub enum ClientError { + /// An invite payload could not be parsed: a field was missing or + /// contained malformed data (e.g. a non-UUID server id, or a channel + /// that the local server cannot create). Joining is aborted instead + /// of silently inventing fresh IDs, which would split-brain the + /// joiner from the rest of the network. + #[error("malformed invite: {0}")] + MalformedInvite(String), +} + /// Helper to bridge `Broker` into an async stream receiver. pub mod event_receiver { use crate::events::ClientEvent; @@ -181,7 +197,10 @@ pub struct ClientHandle { /// Whether persistence to disk is enabled. pub(crate) persistence_enabled: bool, /// Active join links (rarely modified, shared across tasks). - pub(crate) join_links: Arc>>, + /// + /// Uses `parking_lot::Mutex` so a panic while holding the guard does + /// not poison the lock and take down every future caller (issue #114). + pub(crate) join_links: Arc>>, /// Bootstrap peers for gossip topic subscriptions. pub bootstrap_peers: Vec, /// The per-author Merkle-DAG actor — source of truth for all events. @@ -472,7 +491,7 @@ impl ClientHandle { state_actors::DagState::default(), )); let topics: Arc>> = Arc::new(RwLock::new(HashMap::new())); - let join_links = Arc::new(std::sync::Mutex::new(Vec::new())); + let join_links = Arc::new(parking_lot::Mutex::new(Vec::new())); // Build StateRefs for derived actor sources. let event_ref = willow_actor::state::StateRef::from(&event_state_addr); @@ -782,7 +801,7 @@ pub fn test_client() -> ( HashMap::Topic>, >, > = Arc::new(RwLock::new(HashMap::new())); - let join_links = Arc::new(std::sync::Mutex::new(Vec::new())); + let join_links = Arc::new(parking_lot::Mutex::new(Vec::new())); // Build StateRefs and derived views. let event_ref = willow_actor::state::StateRef::from(&event_state_addr); @@ -1058,4 +1077,101 @@ mod tests { "Bob should have SendMessages after generate_invite" ); } + + /// Regression test for issue #115: an invite carrying a non-UUID + /// `server_id` must be rejected with [`ClientError::MalformedInvite`] + /// instead of being silently rewritten to a freshly minted UUID + /// (which would split-brain the joiner from every other peer). + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] + async fn malformed_invite_server_id_is_rejected() { + // The test client uses its own randomly generated identity, so + // we have to encrypt the invite for that identity in order for + // the decryption stage of accept_invite to succeed and the + // validation we care about to actually run. + let (client, _rx) = test_client(); + let recipient_pub = invite::endpoint_id_to_ed25519_public(&client.identity.endpoint_id()); + + // Build a real server + channel + key, then ask generate_invite + // to encrypt the channel key for the test client. + let inviter = Identity::generate(); + let mut server = willow_channel::Server::new("Tamper Server", inviter.endpoint_id()); + let ch_id = server + .create_channel("general", willow_channel::ChannelKind::Text) + .unwrap(); + let topic = util::make_topic(&server, "general"); + let mut keys = HashMap::new(); + if let Some(k) = server.channel_key(&ch_id) { + keys.insert(topic.clone(), k.clone()); + } + let mut topic_map = HashMap::new(); + topic_map.insert(topic.clone(), ("general".to_string(), ch_id)); + let valid_code = invite::generate_invite(&server, &keys, &topic_map, &recipient_pub) + .expect("invite generation must succeed"); + + // Tamper with the embedded server_id so it no longer parses as + // a UUID. This is the exact failure mode #115 describes: an + // invite that looks valid right up to the point where we ask + // willow_channel for a ServerId. + let raw = base64::decode(&valid_code).unwrap(); + let mut payload: invite::InvitePayload = willow_transport::unpack(&raw).unwrap(); + payload.server_id = "not-a-uuid".to_string(); + let tampered_bytes = willow_transport::pack(&payload).unwrap(); + let tampered_code = base64::encode(&tampered_bytes); + + // accept_invite must surface a typed MalformedInvite error + // instead of silently inventing a fresh UUID. + let err = client + .accept_invite(&tampered_code) + .await + .expect_err("malformed invite must be rejected"); + let downcast = err + .downcast::() + .expect("error must be a ClientError::MalformedInvite"); + assert!( + matches!(downcast, ClientError::MalformedInvite(ref msg) if msg.contains("server_id")), + "expected MalformedInvite about server_id, got {downcast:?}" + ); + } + + /// Regression test for issue #114: switching `join_links` to + /// `parking_lot::Mutex` makes lock poisoning impossible by + /// construction. After a panic in one task that holds the lock, a + /// subsequent caller from another task must still be able to + /// acquire it without panicking. + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] + async fn join_links_lock_survives_panic_in_holder() { + let (client, _rx) = test_client(); + + // Spawn a task that grabs the lock, mutates the vec, then panics + // while still holding the guard. With std::sync::Mutex this + // would poison the mutex and every future caller would panic on + // .lock().unwrap(). With parking_lot the lock is simply + // released and remains usable. + let join_links = Arc::clone(&client.join_links); + let panicker = tokio::task::spawn_blocking(move || { + let mut guard = join_links.lock(); + guard.push(crate::ops::JoinLink { + link_id: "first".to_string(), + server_id: "s".to_string(), + max_uses: 1, + used: 0, + active: true, + expires_at: None, + created_at: 0, + }); + panic!("simulated panic while holding the join_links guard"); + }); + let join_result = panicker.await; + assert!(join_result.is_err(), "task should have panicked"); + + // The next caller must NOT panic. Both reads and writes should + // succeed. + let snapshot = client.join_links().await; + assert_eq!(snapshot.len(), 1); + assert_eq!(snapshot[0].link_id, "first"); + + client.delete_join_link("first").await; + let snapshot = client.join_links().await; + assert!(snapshot.is_empty()); + } } diff --git a/crates/client/src/listeners.rs b/crates/client/src/listeners.rs index 6d261af8..46e6c6e1 100644 --- a/crates/client/src/listeners.rs +++ b/crates/client/src/listeners.rs @@ -1,6 +1,8 @@ //! Per-topic listener tasks that stream GossipEvents and mutate state via domain actors. -use std::sync::{Arc, Mutex}; +use std::sync::Arc; + +use parking_lot::Mutex; use crate::events::ClientEvent; use crate::mutations; @@ -330,7 +332,7 @@ async fn process_received_message( } crate::ops::WireMessage::JoinRequest { link_id, peer_id } => { let should_respond = { - let mut links = ctx.join_links.lock().unwrap(); + let mut links = ctx.join_links.lock(); let valid = links .iter_mut() .find(|l| l.link_id == link_id && l.is_valid()); diff --git a/crates/client/src/mutations.rs b/crates/client/src/mutations.rs index 974f49f7..93a1e2c3 100644 --- a/crates/client/src/mutations.rs +++ b/crates/client/src/mutations.rs @@ -11,7 +11,9 @@ //! ``` use std::collections::HashMap; -use std::sync::{Arc, Mutex, RwLock}; +use std::sync::{Arc, RwLock}; + +use parking_lot::Mutex; use willow_actor::{Addr, Broker, Publish, StateActor}; use willow_identity::{EndpointId, Identity};