diff --git a/crates/client/src/listeners.rs b/crates/client/src/listeners.rs index b2543223..fa2451da 100644 --- a/crates/client/src/listeners.rs +++ b/crates/client/src/listeners.rs @@ -31,6 +31,21 @@ pub(crate) const MAX_DISPLAY_NAME_LEN: usize = 128; /// as [`MAX_DISPLAY_NAME_LEN`]. pub(crate) const MAX_TYPING_CHANNEL_LEN: usize = 128; +/// Maximum number of distinct `channel_id`s tracked in +/// [`state_actors::VoiceState::participants`]. Caps the outer dimension of +/// the map so a malicious peer cannot flood `VoiceJoin` with fresh +/// `channel_id`s and exhaust memory on receiving clients. Real servers +/// have far fewer than this many voice channels in practice; the cap is +/// a defensive ceiling, not an expected operating point. See `[SEC-V-03]` +/// (issue #303). +pub(crate) const MAX_VOICE_CHANNELS: usize = 256; + +/// Maximum number of `peer_id`s tracked per voice channel in +/// [`state_actors::VoiceState::participants`]. Caps the inner dimension of +/// the map. Same threat model as [`MAX_VOICE_CHANNELS`]. See `[SEC-V-03]` +/// (issue #303). +pub(crate) const MAX_PARTICIPANTS_PER_CHANNEL: usize = 256; + /// Truncate `s` to at most `max` Unicode scalar values, returning the original /// string unchanged if it already fits. Splits at `char` boundaries so the /// returned `String` is always valid UTF-8. @@ -372,11 +387,45 @@ async fn process_received_message( channel_id, peer_id, } => { + // [SEC-V-03] / #303: drop voice messages whose `channel_id` + // does not exist in the materialized server state, and bound + // both the outer (distinct channels) and inner (participants + // per channel) dimensions of `VoiceState.participants`. + // Without these gates, any signed peer can flood arbitrary + // `channel_id`s and exhaust receiving clients' memory. + let known = willow_actor::state::select(&ctx.event_state, { + let ch = channel_id.clone(); + move |es| es.channels.contains_key(&ch) + }) + .await; + if !known { + tracing::debug!( + %signer, channel_id = %channel_id, + "dropping VoiceJoin: channel_id not in ServerState.channels" + ); + return; + } let ch = channel_id.clone(); - willow_actor::state::mutate(&ctx.voice, move |v| { - v.participants.entry(ch).or_default().insert(peer_id); + let inserted = willow_actor::state::mutate(&ctx.voice, move |v| { + let new_channel = !v.participants.contains_key(&ch); + if new_channel && v.participants.len() >= MAX_VOICE_CHANNELS { + return false; + } + let set = v.participants.entry(ch).or_default(); + if !set.contains(&peer_id) && set.len() >= MAX_PARTICIPANTS_PER_CHANNEL { + return false; + } + set.insert(peer_id); + true }) .await; + if !inserted { + tracing::debug!( + %signer, channel_id = %channel_id, %peer_id, + "dropping VoiceJoin: VoiceState cap reached" + ); + return; + } ctx.event_broker .do_send(willow_actor::Publish(ClientEvent::VoiceJoined { channel_id, @@ -388,6 +437,23 @@ async fn process_received_message( channel_id, peer_id, } => { + // [SEC-V-03] / #303: drop Leaves for unknown channels so the + // event broker never publishes phantom-channel events to the + // UI. The `participants` map is unaffected even without the + // gate (`get_mut` returns None), but the event-broker fanout + // would still leak attacker-controlled `channel_id`s. + let known = willow_actor::state::select(&ctx.event_state, { + let ch = channel_id.clone(); + move |es| es.channels.contains_key(&ch) + }) + .await; + if !known { + tracing::debug!( + %signer, channel_id = %channel_id, + "dropping VoiceLeave: channel_id not in ServerState.channels" + ); + return; + } let ch = channel_id.clone(); willow_actor::state::mutate(&ctx.voice, move |v| { if let Some(p) = v.participants.get_mut(&ch) { @@ -408,6 +474,23 @@ async fn process_received_message( signal, } => { if target_peer == ctx.identity.endpoint_id() { + // [SEC-V-03] / #303: drop signals for unknown channels so + // attacker-supplied `channel_id`s never reach the UI's + // event handlers. Signals do not mutate `participants` + // directly, but the existence check shuts the same fanout + // gap as Join/Leave. + let known = willow_actor::state::select(&ctx.event_state, { + let ch = channel_id.clone(); + move |es| es.channels.contains_key(&ch) + }) + .await; + if !known { + tracing::debug!( + %signer, channel_id = %channel_id, + "dropping VoiceSignal: channel_id not in ServerState.channels" + ); + return; + } ctx.event_broker .do_send(willow_actor::Publish(ClientEvent::VoiceSignal { channel_id, @@ -620,7 +703,7 @@ async fn process_received_message( mod tests { //! Listener tests for the JoinRequest signer guard (SEC-A-03 / #239). use super::*; - use crate::test_client; + use crate::{test_client, ClientHandle}; use std::sync::Arc; use willow_network::Network; @@ -946,6 +1029,270 @@ mod tests { ); } + // ─── [SEC-V-03] / #303 — voice.participants growth caps ──────────────── + + /// Build a `ListenerCtx` from a `test_client()` ClientHandle. + fn make_ctx(client: &ClientHandle) -> ListenerCtx { + ListenerCtx { + event_state: client.event_state_addr.clone(), + chat_meta: client.chat_meta_addr.clone(), + profiles: client.profile_state_addr.clone(), + network: client.network_meta_addr.clone(), + voice: client.voice_state_addr.clone(), + persistence: client.persistence_addr.clone(), + persistence_enabled: false, + event_broker: client.event_broker.clone(), + identity: client.identity.clone(), + join_links: Arc::clone(&client.join_links), + dag: client.dag_addr.clone(), + server_registry: client.server_registry_addr.clone(), + on_neighbor_up: None, + } + } + + /// Helper to read the current `VoiceState`. + async fn voice_snapshot( + client: &ClientHandle, + ) -> state_actors::VoiceState { + willow_actor::state::select(&client.voice_state_addr, |v| v.clone()).await + } + + /// Resolve the test client's known channel id (the "general" channel + /// seeded by `test_client()`). + async fn known_channel_id(client: &ClientHandle) -> String { + let snap = client.state_snapshot().await; + snap.channels + .keys() + .next() + .cloned() + .expect("test_client must seed at least one channel") + } + + /// VoiceJoin against an unknown `channel_id` must not mutate + /// `VoiceState.participants` (defends against attackers flooding + /// random channel ids until memory is exhausted). + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] + async fn voice_join_with_unknown_channel_is_dropped() { + let (client, _rx) = test_client(); + let hub = willow_network::mem::MemHub::new(); + let net = willow_network::mem::MemNetwork::new(&hub); + let (topic_handle, _events) = net + .subscribe(willow_network::topic_id("unit-test-voice-unknown"), vec![]) + .await + .expect("subscribe must succeed"); + let ctx = make_ctx(&client); + + let attacker = willow_identity::Identity::generate(); + let attacker_id = attacker.endpoint_id(); + + let msg = crate::ops::WireMessage::VoiceJoin { + channel_id: "no-such-channel".to_string(), + peer_id: attacker_id, + }; + let bytes = crate::ops::pack_wire(&msg, &attacker).expect("pack_wire must succeed"); + process_received_message(&bytes, attacker_id, &ctx, &topic_handle).await; + + // Brief settle window for actor mailboxes. + tokio::time::sleep(std::time::Duration::from_millis(50)).await; + + let snap = voice_snapshot(&client).await; + assert!( + snap.participants.is_empty(), + "VoiceJoin against an unknown channel_id must not create an entry; got {:?}", + snap.participants + ); + } + + /// Sanity check: VoiceJoin against a real channel records the + /// participant. Without this we wouldn't know the cap test below + /// is exercising the real path. + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] + async fn voice_join_with_known_channel_records_participant() { + let (client, _rx) = test_client(); + let hub = willow_network::mem::MemHub::new(); + let net = willow_network::mem::MemNetwork::new(&hub); + let (topic_handle, _events) = net + .subscribe(willow_network::topic_id("unit-test-voice-known"), vec![]) + .await + .expect("subscribe must succeed"); + let ctx = make_ctx(&client); + let channel_id = known_channel_id(&client).await; + + let attacker = willow_identity::Identity::generate(); + let attacker_id = attacker.endpoint_id(); + let msg = crate::ops::WireMessage::VoiceJoin { + channel_id: channel_id.clone(), + peer_id: attacker_id, + }; + let bytes = crate::ops::pack_wire(&msg, &attacker).expect("pack_wire must succeed"); + process_received_message(&bytes, attacker_id, &ctx, &topic_handle).await; + + let stored = poll_until_some( + || { + let ch = channel_id.clone(); + willow_actor::state::select(&client.voice_state_addr, move |v| { + v.participants + .get(&ch) + .filter(|set| set.contains(&attacker_id)) + .map(|set| set.len()) + }) + }, + std::time::Duration::from_secs(2), + ) + .await; + assert_eq!( + stored, + Some(1), + "VoiceJoin on a known channel must record the participant" + ); + } + + /// Once `MAX_VOICE_CHANNELS` distinct channel_ids have been recorded, + /// further VoiceJoin packets that reference *new* channel_ids must be + /// dropped — even if the channel exists in `ServerState`. This caps the + /// outer dimension of `participants`. + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] + async fn voice_join_rejects_when_distinct_channels_at_cap() { + let (client, _rx) = test_client(); + let hub = willow_network::mem::MemHub::new(); + let net = willow_network::mem::MemNetwork::new(&hub); + let (topic_handle, _events) = net + .subscribe(willow_network::topic_id("unit-test-voice-chcap"), vec![]) + .await + .expect("subscribe must succeed"); + let ctx = make_ctx(&client); + let channel_id = known_channel_id(&client).await; + + // Pre-fill `participants` with `MAX_VOICE_CHANNELS` distinct + // synthetic channel_ids. We seed directly via the actor — the + // listener would normally reject any of these as unknown, but + // we want to test the *cap* in isolation. `state_snapshot()`'s + // single channel is added below as the (cap+1)-th attempt. + willow_actor::state::mutate(&client.voice_state_addr, move |v| { + for i in 0..MAX_VOICE_CHANNELS { + v.participants.entry(format!("filler-{i}")).or_default(); + } + }) + .await; + + let attacker = willow_identity::Identity::generate(); + let attacker_id = attacker.endpoint_id(); + // Use the *real* channel_id so the existence check passes; the + // cap is what must reject this message. + let msg = crate::ops::WireMessage::VoiceJoin { + channel_id: channel_id.clone(), + peer_id: attacker_id, + }; + let bytes = crate::ops::pack_wire(&msg, &attacker).expect("pack_wire must succeed"); + process_received_message(&bytes, attacker_id, &ctx, &topic_handle).await; + tokio::time::sleep(std::time::Duration::from_millis(50)).await; + + let snap = voice_snapshot(&client).await; + assert_eq!( + snap.participants.len(), + MAX_VOICE_CHANNELS, + "distinct-channels cap must hold against new VoiceJoin entries" + ); + assert!( + !snap.participants.contains_key(&channel_id), + "VoiceJoin for a new channel_id must be dropped once the cap is reached" + ); + } + + /// Once a single channel has `MAX_PARTICIPANTS_PER_CHANNEL` participants, + /// further VoiceJoin packets for that channel must be dropped. This caps + /// the inner dimension of `participants`. + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] + async fn voice_join_rejects_when_participants_at_cap() { + let (client, _rx) = test_client(); + let hub = willow_network::mem::MemHub::new(); + let net = willow_network::mem::MemNetwork::new(&hub); + let (topic_handle, _events) = net + .subscribe(willow_network::topic_id("unit-test-voice-pcap"), vec![]) + .await + .expect("subscribe must succeed"); + let ctx = make_ctx(&client); + let channel_id = known_channel_id(&client).await; + + // Fill the cap with synthetic participants on the real channel. + let cap_channel = channel_id.clone(); + willow_actor::state::mutate(&client.voice_state_addr, move |v| { + let set = v.participants.entry(cap_channel).or_default(); + for _ in 0..MAX_PARTICIPANTS_PER_CHANNEL { + set.insert(willow_identity::Identity::generate().endpoint_id()); + } + }) + .await; + + // Confirm preconditions. + let pre = voice_snapshot(&client).await; + assert_eq!( + pre.participants.get(&channel_id).map(|s| s.len()), + Some(MAX_PARTICIPANTS_PER_CHANNEL), + "test setup must fill participants to cap" + ); + + let attacker = willow_identity::Identity::generate(); + let attacker_id = attacker.endpoint_id(); + let msg = crate::ops::WireMessage::VoiceJoin { + channel_id: channel_id.clone(), + peer_id: attacker_id, + }; + let bytes = crate::ops::pack_wire(&msg, &attacker).expect("pack_wire must succeed"); + process_received_message(&bytes, attacker_id, &ctx, &topic_handle).await; + tokio::time::sleep(std::time::Duration::from_millis(50)).await; + + let snap = voice_snapshot(&client).await; + let set = snap + .participants + .get(&channel_id) + .expect("channel set must still exist"); + assert_eq!( + set.len(), + MAX_PARTICIPANTS_PER_CHANNEL, + "participant cap must hold; got {}", + set.len() + ); + assert!( + !set.contains(&attacker_id), + "new participant must be dropped once cap is reached" + ); + } + + /// VoiceLeave against an unknown channel_id must not create an entry + /// in `participants`. Without the gate, an attacker can still grow + /// the map by sending Leaves first (`get_mut` returns None today, so + /// this is effectively a no-op — but we still want to assert the gate + /// so future refactors don't reintroduce the hole, and we add a + /// `tracing::debug!` for observability symmetry with Join). + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] + async fn voice_leave_with_unknown_channel_is_dropped() { + let (client, _rx) = test_client(); + let hub = willow_network::mem::MemHub::new(); + let net = willow_network::mem::MemNetwork::new(&hub); + let (topic_handle, _events) = net + .subscribe(willow_network::topic_id("unit-test-voice-leave"), vec![]) + .await + .expect("subscribe must succeed"); + let ctx = make_ctx(&client); + + let attacker = willow_identity::Identity::generate(); + let attacker_id = attacker.endpoint_id(); + let msg = crate::ops::WireMessage::VoiceLeave { + channel_id: "no-such-channel".to_string(), + peer_id: attacker_id, + }; + let bytes = crate::ops::pack_wire(&msg, &attacker).expect("pack_wire must succeed"); + process_received_message(&bytes, attacker_id, &ctx, &topic_handle).await; + tokio::time::sleep(std::time::Duration::from_millis(50)).await; + + let snap = voice_snapshot(&client).await; + assert!( + snap.participants.is_empty(), + "VoiceLeave against an unknown channel_id must not produce a state mutation" + ); + } + /// Poll a closure that returns `impl Future>` until /// it yields `Some` or the deadline expires. Returns the last observed /// value (whether `Some` or `None`).