Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,3 +36,6 @@ uuid = { version = "1", features = ["v4", "serde", "js"] }
# Logging
tracing = "0.1"
tracing-subscriber = { version = "0.3", features = ["env-filter"] }

# Synchronization primitives (poison-free, WASM-compatible)
parking_lot = "0.12"
2 changes: 2 additions & 0 deletions crates/client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,10 @@ willow-common = { path = "../common" }
anyhow = { workspace = true }
bytes = { workspace = true }
serde = { workspace = true }
thiserror = { workspace = true }
uuid = { workspace = true }
tracing = { workspace = true }
parking_lot = { workspace = true }
futures = "0.3"

[target.'cfg(not(target_arch = "wasm32"))'.dependencies]
Expand Down
31 changes: 19 additions & 12 deletions crates/client/src/joining.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,19 @@ impl<N: willow_network::Network> ClientHandle<N> {
.next()
.map(|(name, _)| name.clone());

// Validate the server id BEFORE we touch any actor state. If the
// invite is malformed we want to surface a typed error to the
// caller instead of silently inventing a fresh server id, which
// would split-brain the joiner from the rest of the network
// (issue #115).
let parsed_server_uuid = uuid::Uuid::parse_str(&server_id).map_err(|e| {
crate::ClientError::MalformedInvite(format!("invalid server_id `{server_id}`: {e}"))
})?;

// Update server registry.
let channel_topics = willow_actor::state::mutate(
&self.server_registry_addr,
move |reg| -> anyhow::Result<Vec<String>> {
move |reg| -> Result<Vec<String>, crate::ClientError> {
if let Some(entry) = reg.servers.get_mut(&server_id) {
for (topic, (name, key)) in &accepted.channel_keys {
entry.keys.insert(topic.clone(), key.clone());
Expand All @@ -63,11 +72,8 @@ impl<N: willow_network::Network> ClientHandle<N> {
}
}
} else {
let parsed_id = willow_channel::ServerId(
uuid::Uuid::parse_str(&server_id).unwrap_or_else(|_| uuid::Uuid::new_v4()),
);
let mut server = willow_channel::Server::with_id(
parsed_id,
willow_channel::ServerId(parsed_server_uuid),
&accepted.server_name,
accepted.genesis_author,
);
Expand All @@ -76,7 +82,11 @@ impl<N: willow_network::Network> ClientHandle<N> {
for (topic, (name, key)) in &accepted.channel_keys {
let ch_id = server
.create_channel(name, willow_channel::ChannelKind::Text)
.unwrap_or_else(|_| willow_channel::ChannelId::new());
.map_err(|e| {
crate::ClientError::MalformedInvite(format!(
"could not create channel `{name}` from invite: {e}"
))
})?;
server.set_channel_key(ch_id.clone(), key.clone());
keys.insert(topic.clone(), key.clone());
topic_map.insert(topic.clone(), (name.clone(), ch_id));
Expand Down Expand Up @@ -206,20 +216,17 @@ impl<N: willow_network::Network> ClientHandle<N> {
server_name,
inviter_name,
};
self.join_links.lock().unwrap().push(link);
self.join_links.lock().push(link);
Ok(token.encode())
}

pub async fn join_links(&self) -> Vec<ops::JoinLink> {
self.join_links.lock().unwrap().clone()
self.join_links.lock().clone()
}

pub async fn delete_join_link(&self, link_id: &str) {
let link_id = link_id.to_string();
self.join_links
.lock()
.unwrap()
.retain(|l| l.link_id != link_id);
self.join_links.lock().retain(|l| l.link_id != link_id);
}

pub async fn set_display_name(&self, name: &str) {
Expand Down
122 changes: 119 additions & 3 deletions crates/client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,22 @@ pub use event_receiver::EventReceiver;
pub use events::ClientEvent;
pub use ops::{pack_wire, unpack_wire, VoiceSignalPayload, WireMessage};

/// Errors returned by client API entry points.
///
/// New variants are added as the crate's surface grows. For now this
/// covers the few cases where we explicitly want to surface a typed
/// failure to callers (rather than swallowing it via `anyhow`).
#[derive(Debug, thiserror::Error)]
pub enum ClientError {
/// An invite payload could not be parsed: a field was missing or
/// contained malformed data (e.g. a non-UUID server id, or a channel
/// that the local server cannot create). Joining is aborted instead
/// of silently inventing fresh IDs, which would split-brain the
/// joiner from the rest of the network.
#[error("malformed invite: {0}")]
MalformedInvite(String),
}

/// Helper to bridge `Broker<ClientEvent>` into an async stream receiver.
pub mod event_receiver {
use crate::events::ClientEvent;
Expand Down Expand Up @@ -181,7 +197,10 @@ pub struct ClientHandle<N: willow_network::Network> {
/// Whether persistence to disk is enabled.
pub(crate) persistence_enabled: bool,
/// Active join links (rarely modified, shared across tasks).
pub(crate) join_links: Arc<std::sync::Mutex<Vec<ops::JoinLink>>>,
///
/// Uses `parking_lot::Mutex` so a panic while holding the guard does
/// not poison the lock and take down every future caller (issue #114).
pub(crate) join_links: Arc<parking_lot::Mutex<Vec<ops::JoinLink>>>,
/// Bootstrap peers for gossip topic subscriptions.
pub bootstrap_peers: Vec<willow_identity::EndpointId>,
/// The per-author Merkle-DAG actor — source of truth for all events.
Expand Down Expand Up @@ -472,7 +491,7 @@ impl<N: willow_network::Network> ClientHandle<N> {
state_actors::DagState::default(),
));
let topics: Arc<RwLock<HashMap<String, N::Topic>>> = Arc::new(RwLock::new(HashMap::new()));
let join_links = Arc::new(std::sync::Mutex::new(Vec::new()));
let join_links = Arc::new(parking_lot::Mutex::new(Vec::new()));

// Build StateRefs for derived actor sources.
let event_ref = willow_actor::state::StateRef::from(&event_state_addr);
Expand Down Expand Up @@ -782,7 +801,7 @@ pub fn test_client() -> (
HashMap<String, <willow_network::mem::MemNetwork as willow_network::Network>::Topic>,
>,
> = Arc::new(RwLock::new(HashMap::new()));
let join_links = Arc::new(std::sync::Mutex::new(Vec::new()));
let join_links = Arc::new(parking_lot::Mutex::new(Vec::new()));

// Build StateRefs and derived views.
let event_ref = willow_actor::state::StateRef::from(&event_state_addr);
Expand Down Expand Up @@ -1058,4 +1077,101 @@ mod tests {
"Bob should have SendMessages after generate_invite"
);
}

/// Regression test for issue #115: an invite carrying a non-UUID
/// `server_id` must be rejected with [`ClientError::MalformedInvite`]
/// instead of being silently rewritten to a freshly minted UUID
/// (which would split-brain the joiner from every other peer).
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn malformed_invite_server_id_is_rejected() {
// The test client uses its own randomly generated identity, so
// we have to encrypt the invite for that identity in order for
// the decryption stage of accept_invite to succeed and the
// validation we care about to actually run.
let (client, _rx) = test_client();
let recipient_pub = invite::endpoint_id_to_ed25519_public(&client.identity.endpoint_id());

// Build a real server + channel + key, then ask generate_invite
// to encrypt the channel key for the test client.
let inviter = Identity::generate();
let mut server = willow_channel::Server::new("Tamper Server", inviter.endpoint_id());
let ch_id = server
.create_channel("general", willow_channel::ChannelKind::Text)
.unwrap();
let topic = util::make_topic(&server, "general");
let mut keys = HashMap::new();
if let Some(k) = server.channel_key(&ch_id) {
keys.insert(topic.clone(), k.clone());
}
let mut topic_map = HashMap::new();
topic_map.insert(topic.clone(), ("general".to_string(), ch_id));
let valid_code = invite::generate_invite(&server, &keys, &topic_map, &recipient_pub)
.expect("invite generation must succeed");

// Tamper with the embedded server_id so it no longer parses as
// a UUID. This is the exact failure mode #115 describes: an
// invite that looks valid right up to the point where we ask
// willow_channel for a ServerId.
let raw = base64::decode(&valid_code).unwrap();
let mut payload: invite::InvitePayload = willow_transport::unpack(&raw).unwrap();
payload.server_id = "not-a-uuid".to_string();
let tampered_bytes = willow_transport::pack(&payload).unwrap();
let tampered_code = base64::encode(&tampered_bytes);

// accept_invite must surface a typed MalformedInvite error
// instead of silently inventing a fresh UUID.
let err = client
.accept_invite(&tampered_code)
.await
.expect_err("malformed invite must be rejected");
let downcast = err
.downcast::<ClientError>()
.expect("error must be a ClientError::MalformedInvite");
assert!(
matches!(downcast, ClientError::MalformedInvite(ref msg) if msg.contains("server_id")),
"expected MalformedInvite about server_id, got {downcast:?}"
);
}

/// Regression test for issue #114: switching `join_links` to
/// `parking_lot::Mutex` makes lock poisoning impossible by
/// construction. After a panic in one task that holds the lock, a
/// subsequent caller from another task must still be able to
/// acquire it without panicking.
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn join_links_lock_survives_panic_in_holder() {
let (client, _rx) = test_client();

// Spawn a task that grabs the lock, mutates the vec, then panics
// while still holding the guard. With std::sync::Mutex this
// would poison the mutex and every future caller would panic on
// .lock().unwrap(). With parking_lot the lock is simply
// released and remains usable.
let join_links = Arc::clone(&client.join_links);
let panicker = tokio::task::spawn_blocking(move || {
let mut guard = join_links.lock();
guard.push(crate::ops::JoinLink {
link_id: "first".to_string(),
server_id: "s".to_string(),
max_uses: 1,
used: 0,
active: true,
expires_at: None,
created_at: 0,
});
panic!("simulated panic while holding the join_links guard");
});
let join_result = panicker.await;
assert!(join_result.is_err(), "task should have panicked");

// The next caller must NOT panic. Both reads and writes should
// succeed.
let snapshot = client.join_links().await;
assert_eq!(snapshot.len(), 1);
assert_eq!(snapshot[0].link_id, "first");

client.delete_join_link("first").await;
let snapshot = client.join_links().await;
assert!(snapshot.is_empty());
}
}
6 changes: 4 additions & 2 deletions crates/client/src/listeners.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
//! Per-topic listener tasks that stream GossipEvents and mutate state via domain actors.

use std::sync::{Arc, Mutex};
use std::sync::Arc;

use parking_lot::Mutex;

use crate::events::ClientEvent;
use crate::mutations;
Expand Down Expand Up @@ -330,7 +332,7 @@ async fn process_received_message<T: TopicHandle>(
}
crate::ops::WireMessage::JoinRequest { link_id, peer_id } => {
let should_respond = {
let mut links = ctx.join_links.lock().unwrap();
let mut links = ctx.join_links.lock();
let valid = links
.iter_mut()
.find(|l| l.link_id == link_id && l.is_valid());
Expand Down
4 changes: 3 additions & 1 deletion crates/client/src/mutations.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,9 @@
//! ```

use std::collections::HashMap;
use std::sync::{Arc, Mutex, RwLock};
use std::sync::{Arc, RwLock};

use parking_lot::Mutex;

use willow_actor::{Addr, Broker, Publish, StateActor};
use willow_identity::{EndpointId, Identity};
Expand Down
Loading