Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
353 changes: 350 additions & 3 deletions crates/client/src/listeners.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,21 @@ pub(crate) const MAX_DISPLAY_NAME_LEN: usize = 128;
/// as [`MAX_DISPLAY_NAME_LEN`].
pub(crate) const MAX_TYPING_CHANNEL_LEN: usize = 128;

/// Maximum number of distinct `channel_id`s tracked in
/// [`state_actors::VoiceState::participants`]. Caps the outer dimension of
/// the map so a malicious peer cannot flood `VoiceJoin` with fresh
/// `channel_id`s and exhaust memory on receiving clients. Real servers
/// have far fewer than this many voice channels in practice; the cap is
/// a defensive ceiling, not an expected operating point. See `[SEC-V-03]`
/// (issue #303).
pub(crate) const MAX_VOICE_CHANNELS: usize = 256;

/// Maximum number of `peer_id`s tracked per voice channel in
/// [`state_actors::VoiceState::participants`]. Caps the inner dimension of
/// the map. Same threat model as [`MAX_VOICE_CHANNELS`]. See `[SEC-V-03]`
/// (issue #303).
pub(crate) const MAX_PARTICIPANTS_PER_CHANNEL: usize = 256;

/// Truncate `s` to at most `max` Unicode scalar values, returning the original
/// string unchanged if it already fits. Splits at `char` boundaries so the
/// returned `String` is always valid UTF-8.
Expand Down Expand Up @@ -372,11 +387,45 @@ async fn process_received_message<T: TopicHandle>(
channel_id,
peer_id,
} => {
// [SEC-V-03] / #303: drop voice messages whose `channel_id`
// does not exist in the materialized server state, and bound
// both the outer (distinct channels) and inner (participants
// per channel) dimensions of `VoiceState.participants`.
// Without these gates, any signed peer can flood arbitrary
// `channel_id`s and exhaust receiving clients' memory.
let known = willow_actor::state::select(&ctx.event_state, {
let ch = channel_id.clone();
move |es| es.channels.contains_key(&ch)
})
.await;
if !known {
tracing::debug!(
%signer, channel_id = %channel_id,
"dropping VoiceJoin: channel_id not in ServerState.channels"
);
return;
}
let ch = channel_id.clone();
willow_actor::state::mutate(&ctx.voice, move |v| {
v.participants.entry(ch).or_default().insert(peer_id);
let inserted = willow_actor::state::mutate(&ctx.voice, move |v| {
let new_channel = !v.participants.contains_key(&ch);
if new_channel && v.participants.len() >= MAX_VOICE_CHANNELS {
return false;
}
let set = v.participants.entry(ch).or_default();
if !set.contains(&peer_id) && set.len() >= MAX_PARTICIPANTS_PER_CHANNEL {
return false;
}
set.insert(peer_id);
true
})
.await;
if !inserted {
tracing::debug!(
%signer, channel_id = %channel_id, %peer_id,
"dropping VoiceJoin: VoiceState cap reached"
);
return;
}
ctx.event_broker
.do_send(willow_actor::Publish(ClientEvent::VoiceJoined {
channel_id,
Expand All @@ -388,6 +437,23 @@ async fn process_received_message<T: TopicHandle>(
channel_id,
peer_id,
} => {
// [SEC-V-03] / #303: drop Leaves for unknown channels so the
// event broker never publishes phantom-channel events to the
// UI. The `participants` map is unaffected even without the
// gate (`get_mut` returns None), but the event-broker fanout
// would still leak attacker-controlled `channel_id`s.
let known = willow_actor::state::select(&ctx.event_state, {
let ch = channel_id.clone();
move |es| es.channels.contains_key(&ch)
})
.await;
if !known {
tracing::debug!(
%signer, channel_id = %channel_id,
"dropping VoiceLeave: channel_id not in ServerState.channels"
);
return;
}
let ch = channel_id.clone();
willow_actor::state::mutate(&ctx.voice, move |v| {
if let Some(p) = v.participants.get_mut(&ch) {
Expand All @@ -408,6 +474,23 @@ async fn process_received_message<T: TopicHandle>(
signal,
} => {
if target_peer == ctx.identity.endpoint_id() {
// [SEC-V-03] / #303: drop signals for unknown channels so
// attacker-supplied `channel_id`s never reach the UI's
// event handlers. Signals do not mutate `participants`
// directly, but the existence check shuts the same fanout
// gap as Join/Leave.
let known = willow_actor::state::select(&ctx.event_state, {
let ch = channel_id.clone();
move |es| es.channels.contains_key(&ch)
})
.await;
if !known {
tracing::debug!(
%signer, channel_id = %channel_id,
"dropping VoiceSignal: channel_id not in ServerState.channels"
);
return;
}
ctx.event_broker
.do_send(willow_actor::Publish(ClientEvent::VoiceSignal {
channel_id,
Expand Down Expand Up @@ -620,7 +703,7 @@ async fn process_received_message<T: TopicHandle>(
mod tests {
//! Listener tests for the JoinRequest signer guard (SEC-A-03 / #239).
use super::*;
use crate::test_client;
use crate::{test_client, ClientHandle};
use std::sync::Arc;
use willow_network::Network;

Expand Down Expand Up @@ -946,6 +1029,270 @@ mod tests {
);
}

// ─── [SEC-V-03] / #303 — voice.participants growth caps ────────────────

/// Build a `ListenerCtx` from a `test_client()` ClientHandle.
fn make_ctx<N: willow_network::Network>(client: &ClientHandle<N>) -> ListenerCtx {
ListenerCtx {
event_state: client.event_state_addr.clone(),
chat_meta: client.chat_meta_addr.clone(),
profiles: client.profile_state_addr.clone(),
network: client.network_meta_addr.clone(),
voice: client.voice_state_addr.clone(),
persistence: client.persistence_addr.clone(),
persistence_enabled: false,
event_broker: client.event_broker.clone(),
identity: client.identity.clone(),
join_links: Arc::clone(&client.join_links),
dag: client.dag_addr.clone(),
server_registry: client.server_registry_addr.clone(),
on_neighbor_up: None,
}
}

/// Helper to read the current `VoiceState`.
async fn voice_snapshot<N: willow_network::Network>(
client: &ClientHandle<N>,
) -> state_actors::VoiceState {
willow_actor::state::select(&client.voice_state_addr, |v| v.clone()).await
}

/// Resolve the test client's known channel id (the "general" channel
/// seeded by `test_client()`).
async fn known_channel_id<N: willow_network::Network>(client: &ClientHandle<N>) -> String {
let snap = client.state_snapshot().await;
snap.channels
.keys()
.next()
.cloned()
.expect("test_client must seed at least one channel")
}

/// VoiceJoin against an unknown `channel_id` must not mutate
/// `VoiceState.participants` (defends against attackers flooding
/// random channel ids until memory is exhausted).
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn voice_join_with_unknown_channel_is_dropped() {
let (client, _rx) = test_client();
let hub = willow_network::mem::MemHub::new();
let net = willow_network::mem::MemNetwork::new(&hub);
let (topic_handle, _events) = net
.subscribe(willow_network::topic_id("unit-test-voice-unknown"), vec![])
.await
.expect("subscribe must succeed");
let ctx = make_ctx(&client);

let attacker = willow_identity::Identity::generate();
let attacker_id = attacker.endpoint_id();

let msg = crate::ops::WireMessage::VoiceJoin {
channel_id: "no-such-channel".to_string(),
peer_id: attacker_id,
};
let bytes = crate::ops::pack_wire(&msg, &attacker).expect("pack_wire must succeed");
process_received_message(&bytes, attacker_id, &ctx, &topic_handle).await;

// Brief settle window for actor mailboxes.
tokio::time::sleep(std::time::Duration::from_millis(50)).await;

let snap = voice_snapshot(&client).await;
assert!(
snap.participants.is_empty(),
"VoiceJoin against an unknown channel_id must not create an entry; got {:?}",
snap.participants
);
}

/// Sanity check: VoiceJoin against a real channel records the
/// participant. Without this we wouldn't know the cap test below
/// is exercising the real path.
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn voice_join_with_known_channel_records_participant() {
let (client, _rx) = test_client();
let hub = willow_network::mem::MemHub::new();
let net = willow_network::mem::MemNetwork::new(&hub);
let (topic_handle, _events) = net
.subscribe(willow_network::topic_id("unit-test-voice-known"), vec![])
.await
.expect("subscribe must succeed");
let ctx = make_ctx(&client);
let channel_id = known_channel_id(&client).await;

let attacker = willow_identity::Identity::generate();
let attacker_id = attacker.endpoint_id();
let msg = crate::ops::WireMessage::VoiceJoin {
channel_id: channel_id.clone(),
peer_id: attacker_id,
};
let bytes = crate::ops::pack_wire(&msg, &attacker).expect("pack_wire must succeed");
process_received_message(&bytes, attacker_id, &ctx, &topic_handle).await;

let stored = poll_until_some(
|| {
let ch = channel_id.clone();
willow_actor::state::select(&client.voice_state_addr, move |v| {
v.participants
.get(&ch)
.filter(|set| set.contains(&attacker_id))
.map(|set| set.len())
})
},
std::time::Duration::from_secs(2),
)
.await;
assert_eq!(
stored,
Some(1),
"VoiceJoin on a known channel must record the participant"
);
}

/// Once `MAX_VOICE_CHANNELS` distinct channel_ids have been recorded,
/// further VoiceJoin packets that reference *new* channel_ids must be
/// dropped — even if the channel exists in `ServerState`. This caps the
/// outer dimension of `participants`.
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn voice_join_rejects_when_distinct_channels_at_cap() {
let (client, _rx) = test_client();
let hub = willow_network::mem::MemHub::new();
let net = willow_network::mem::MemNetwork::new(&hub);
let (topic_handle, _events) = net
.subscribe(willow_network::topic_id("unit-test-voice-chcap"), vec![])
.await
.expect("subscribe must succeed");
let ctx = make_ctx(&client);
let channel_id = known_channel_id(&client).await;

// Pre-fill `participants` with `MAX_VOICE_CHANNELS` distinct
// synthetic channel_ids. We seed directly via the actor — the
// listener would normally reject any of these as unknown, but
// we want to test the *cap* in isolation. `state_snapshot()`'s
// single channel is added below as the (cap+1)-th attempt.
willow_actor::state::mutate(&client.voice_state_addr, move |v| {
for i in 0..MAX_VOICE_CHANNELS {
v.participants.entry(format!("filler-{i}")).or_default();
}
})
.await;

let attacker = willow_identity::Identity::generate();
let attacker_id = attacker.endpoint_id();
// Use the *real* channel_id so the existence check passes; the
// cap is what must reject this message.
let msg = crate::ops::WireMessage::VoiceJoin {
channel_id: channel_id.clone(),
peer_id: attacker_id,
};
let bytes = crate::ops::pack_wire(&msg, &attacker).expect("pack_wire must succeed");
process_received_message(&bytes, attacker_id, &ctx, &topic_handle).await;
tokio::time::sleep(std::time::Duration::from_millis(50)).await;

let snap = voice_snapshot(&client).await;
assert_eq!(
snap.participants.len(),
MAX_VOICE_CHANNELS,
"distinct-channels cap must hold against new VoiceJoin entries"
);
assert!(
!snap.participants.contains_key(&channel_id),
"VoiceJoin for a new channel_id must be dropped once the cap is reached"
);
}

/// Once a single channel has `MAX_PARTICIPANTS_PER_CHANNEL` participants,
/// further VoiceJoin packets for that channel must be dropped. This caps
/// the inner dimension of `participants`.
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn voice_join_rejects_when_participants_at_cap() {
let (client, _rx) = test_client();
let hub = willow_network::mem::MemHub::new();
let net = willow_network::mem::MemNetwork::new(&hub);
let (topic_handle, _events) = net
.subscribe(willow_network::topic_id("unit-test-voice-pcap"), vec![])
.await
.expect("subscribe must succeed");
let ctx = make_ctx(&client);
let channel_id = known_channel_id(&client).await;

// Fill the cap with synthetic participants on the real channel.
let cap_channel = channel_id.clone();
willow_actor::state::mutate(&client.voice_state_addr, move |v| {
let set = v.participants.entry(cap_channel).or_default();
for _ in 0..MAX_PARTICIPANTS_PER_CHANNEL {
set.insert(willow_identity::Identity::generate().endpoint_id());
}
})
.await;

// Confirm preconditions.
let pre = voice_snapshot(&client).await;
assert_eq!(
pre.participants.get(&channel_id).map(|s| s.len()),
Some(MAX_PARTICIPANTS_PER_CHANNEL),
"test setup must fill participants to cap"
);

let attacker = willow_identity::Identity::generate();
let attacker_id = attacker.endpoint_id();
let msg = crate::ops::WireMessage::VoiceJoin {
channel_id: channel_id.clone(),
peer_id: attacker_id,
};
let bytes = crate::ops::pack_wire(&msg, &attacker).expect("pack_wire must succeed");
process_received_message(&bytes, attacker_id, &ctx, &topic_handle).await;
tokio::time::sleep(std::time::Duration::from_millis(50)).await;

let snap = voice_snapshot(&client).await;
let set = snap
.participants
.get(&channel_id)
.expect("channel set must still exist");
assert_eq!(
set.len(),
MAX_PARTICIPANTS_PER_CHANNEL,
"participant cap must hold; got {}",
set.len()
);
assert!(
!set.contains(&attacker_id),
"new participant must be dropped once cap is reached"
);
}

/// VoiceLeave against an unknown channel_id must not create an entry
/// in `participants`. Without the gate, an attacker can still grow
/// the map by sending Leaves first (`get_mut` returns None today, so
/// this is effectively a no-op — but we still want to assert the gate
/// so future refactors don't reintroduce the hole, and we add a
/// `tracing::debug!` for observability symmetry with Join).
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn voice_leave_with_unknown_channel_is_dropped() {
let (client, _rx) = test_client();
let hub = willow_network::mem::MemHub::new();
let net = willow_network::mem::MemNetwork::new(&hub);
let (topic_handle, _events) = net
.subscribe(willow_network::topic_id("unit-test-voice-leave"), vec![])
.await
.expect("subscribe must succeed");
let ctx = make_ctx(&client);

let attacker = willow_identity::Identity::generate();
let attacker_id = attacker.endpoint_id();
let msg = crate::ops::WireMessage::VoiceLeave {
channel_id: "no-such-channel".to_string(),
peer_id: attacker_id,
};
let bytes = crate::ops::pack_wire(&msg, &attacker).expect("pack_wire must succeed");
process_received_message(&bytes, attacker_id, &ctx, &topic_handle).await;
tokio::time::sleep(std::time::Duration::from_millis(50)).await;

let snap = voice_snapshot(&client).await;
assert!(
snap.participants.is_empty(),
"VoiceLeave against an unknown channel_id must not produce a state mutation"
);
}

/// Poll a closure that returns `impl Future<Output = Option<T>>` until
/// it yields `Some` or the deadline expires. Returns the last observed
/// value (whether `Some` or `None`).
Expand Down