Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
106 changes: 106 additions & 0 deletions crates/actor/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -606,6 +606,112 @@ mod tests {
assert!(tx.try_send(3).is_err());
}

// ───── Bounded mailbox (issue #78) ───────────────────────────────────

#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn mailbox_drops_messages_when_capacity_exceeded() {
// Issue #78: Actor mailboxes must be bounded to prevent OOM DoS.
// With unbounded channels, send() would never return Err due to
// capacity — only if the receiver is closed. With bounded channels,
// send() returns Err when the mailbox is full.

// Test directly at the channel level — this is deterministic.
let (tx, _rx) = runtime::channel::<u32>(4);
assert!(tx.send(1).is_ok());
assert!(tx.send(2).is_ok());
assert!(tx.send(3).is_ok());
assert!(tx.send(4).is_ok());
// Channel is full — this should fail.
assert!(
tx.send(5).is_err(),
"expected mailbox full error, but send succeeded (unbounded!)"
);

// Now test via actor spawn_with_capacity — use a slow actor to
// ensure we can fill the mailbox.
struct SlowActor;
impl Actor for SlowActor {}

struct SlowMsg;
impl Message for SlowMsg {
type Result = ();
}
impl Handler<SlowMsg> for SlowActor {
fn handle(
&mut self,
_msg: SlowMsg,
_ctx: &mut Context<Self>,
) -> impl std::future::Future<Output = ()> + Send {
async {
runtime::sleep(Duration::from_millis(200)).await;
}
}
}

let system = System::new();
let addr = system.handle().spawn_with_capacity(SlowActor, 4);

// Send one message to trigger processing (actor will be busy 200ms).
addr.do_send(SlowMsg).unwrap();
// Let the actor dequeue it and begin handling.
runtime::sleep(Duration::from_millis(30)).await;

// Now fill the remaining mailbox capacity (4 slots).
for _ in 0..4 {
let _ = addr.do_send(SlowMsg);
}

// The mailbox should now be full — next send should fail.
let result = addr.do_send(SlowMsg);
assert!(
result.is_err(),
"expected mailbox full error, but send succeeded (unbounded!)"
);

system.shutdown().await;
}

#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn shutdown_succeeds_with_full_mailbox() {
// Ensure shutdown doesn't deadlock when a mailbox is full.
// The stop flag (AtomicBool) is set even if the noop wake-up
// message is dropped due to a full mailbox.
struct SlowActor;
impl Actor for SlowActor {}

struct Block;
impl Message for Block {
type Result = ();
}
impl Handler<Block> for SlowActor {
fn handle(
&mut self,
_msg: Block,
_ctx: &mut Context<Self>,
) -> impl std::future::Future<Output = ()> + Send {
async {
runtime::sleep(Duration::from_millis(50)).await;
}
}
}

let system = System::new();
let addr = system.handle().spawn_with_capacity(SlowActor, 2);

// Fill the mailbox.
for _ in 0..4 {
let _ = addr.do_send(Block);
}

// Shutdown should complete even though the noop wake-up may
// be dropped due to the full mailbox.
let shutdown = tokio::time::timeout(Duration::from_secs(5), system.shutdown());
assert!(
shutdown.await.is_ok(),
"shutdown deadlocked with full mailbox"
);
}

// ───── Supervision ─────────────────────────────────────────────────────

#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
Expand Down
52 changes: 38 additions & 14 deletions crates/actor/src/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,23 @@ pub async fn sleep(duration: Duration) {
gloo_timers::future::sleep(duration).await;
}

// ───── Unbounded MPSC channel ──────────────────────────────────────────────
// ───── Default mailbox capacity ───────────────────────────────────────────

/// Sender half of an unbounded MPSC channel.
/// Default capacity for actor mailbox channels.
///
/// This prevents unbounded memory growth from message flooding (OOM DoS).
/// Actors whose mailbox reaches this limit will have messages dropped with
/// a warning log.
pub const DEFAULT_MAILBOX_CAPACITY: usize = 10_000;

// ───── Bounded MPSC channel (actor mailbox) ──────────────────────────────

/// Sender half of a bounded MPSC channel used for actor mailboxes.
#[cfg(not(target_arch = "wasm32"))]
pub struct Sender<T>(tokio::sync::mpsc::UnboundedSender<T>);
pub struct Sender<T>(tokio::sync::mpsc::Sender<T>);

#[cfg(target_arch = "wasm32")]
pub struct Sender<T>(futures_channel::mpsc::UnboundedSender<T>);
pub struct Sender<T>(futures_channel::mpsc::Sender<T>);

impl<T> Clone for Sender<T> {
fn clone(&self) -> Self {
Expand All @@ -44,16 +53,27 @@ impl<T> Clone for Sender<T> {
}

impl<T: Send + 'static> Sender<T> {
/// Send a value. Returns `Err` if the receiver is closed.
/// Try to send a value. Returns `Err(val)` if the channel is full or closed.
pub fn send(&self, val: T) -> Result<(), T> {
#[cfg(not(target_arch = "wasm32"))]
{
self.0.send(val).map_err(|e| e.0)
self.0.try_send(val).map_err(|e| {
if matches!(e, tokio::sync::mpsc::error::TrySendError::Full(_)) {
tracing::warn!("actor mailbox full, dropping message");
}
e.into_inner()
})
}

#[cfg(target_arch = "wasm32")]
{
self.0.unbounded_send(val).map_err(|e| e.into_inner())
let mut sender = self.0.clone();
sender.try_send(val).map_err(|e| {
if e.is_full() {
tracing::warn!("actor mailbox full, dropping message");
}
e.into_inner()
})
}
}

Expand All @@ -63,12 +83,12 @@ impl<T: Send + 'static> Sender<T> {
}
}

/// Receiver half of an unbounded MPSC channel.
/// Receiver half of a bounded MPSC channel used for actor mailboxes.
#[cfg(not(target_arch = "wasm32"))]
pub struct Receiver<T>(tokio::sync::mpsc::UnboundedReceiver<T>);
pub struct Receiver<T>(tokio::sync::mpsc::Receiver<T>);

#[cfg(target_arch = "wasm32")]
pub struct Receiver<T>(futures_channel::mpsc::UnboundedReceiver<T>);
pub struct Receiver<T>(futures_channel::mpsc::Receiver<T>);

impl<T: Send + 'static> Receiver<T> {
/// Wait for the next value. Returns `None` when all senders are dropped.
Expand Down Expand Up @@ -104,17 +124,21 @@ impl<T: Send + 'static> Receiver<T> {
}
}

/// Create an unbounded MPSC channel.
pub fn unbounded_channel<T: Send + 'static>() -> (Sender<T>, Receiver<T>) {
/// Create a bounded MPSC channel with the given capacity.
///
/// This replaces the former `unbounded_channel()` to prevent OOM DoS attacks
/// from message flooding. When the channel is full, `Sender::send()` returns
/// an error and the message is dropped.
pub fn channel<T: Send + 'static>(capacity: usize) -> (Sender<T>, Receiver<T>) {
#[cfg(not(target_arch = "wasm32"))]
{
let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
let (tx, rx) = tokio::sync::mpsc::channel(capacity);
(Sender(tx), Receiver(rx))
}

#[cfg(target_arch = "wasm32")]
{
let (tx, rx) = futures_channel::mpsc::unbounded();
let (tx, rx) = futures_channel::mpsc::channel(capacity);
(Sender(tx), Receiver(rx))
}
}
Expand Down
2 changes: 1 addition & 1 deletion crates/actor/src/supervisor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ pub(crate) fn spawn_supervised<A: Actor + Clone>(
policy: RestartPolicy,
system: SystemHandle,
) -> Addr<A> {
let (tx, rx) = runtime::unbounded_channel::<BoxEnvelope<A>>();
let (tx, rx) = runtime::channel::<BoxEnvelope<A>>(runtime::DEFAULT_MAILBOX_CAPACITY);
let addr = Addr::new(tx.clone());

runtime::spawn(supervisor_loop(actor, policy, rx, tx, system, addr.clone()));
Expand Down
19 changes: 15 additions & 4 deletions crates/actor/src/system.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use crate::addr::Addr;
use crate::context::Context;
use crate::envelope::BoxEnvelope;
use crate::mailbox;
use crate::runtime::{self, OneshotRx, Sender};
use crate::runtime::{self, OneshotRx, Sender, DEFAULT_MAILBOX_CAPACITY};

// ───── SystemActor (internal) ──────────────────────────────────────────

Expand Down Expand Up @@ -102,7 +102,7 @@ impl System {
pub fn new() -> Self {
// Bootstrap: spawn the SystemActor directly (it can't
// register itself — it IS the registry).
let (tx, rx) = runtime::unbounded_channel();
let (tx, rx) = runtime::channel(DEFAULT_MAILBOX_CAPACITY);
let addr = Addr::new(tx.clone());
let stop = Arc::new(AtomicBool::new(false));
let (done_tx, _done_rx) = runtime::oneshot();
Expand Down Expand Up @@ -174,7 +174,16 @@ impl SystemHandle {

/// Spawn a top-level actor and return its address.
pub fn spawn<A: Actor>(&self, actor: A) -> Addr<A> {
let (tx, rx) = runtime::unbounded_channel();
self.spawn_with_capacity(actor, DEFAULT_MAILBOX_CAPACITY)
}

/// Spawn a top-level actor with a custom mailbox capacity.
///
/// Use this when an actor has different backpressure needs than the
/// default. Also useful in tests to verify bounded mailbox behavior
/// with a small capacity.
pub fn spawn_with_capacity<A: Actor>(&self, actor: A, capacity: usize) -> Addr<A> {
let (tx, rx) = runtime::channel(capacity);
let addr = Addr::new(tx.clone());
let stop = Arc::new(AtomicBool::new(false));
let (done_tx, done_rx) = runtime::oneshot();
Expand All @@ -184,7 +193,9 @@ impl SystemHandle {
runtime::spawn(mailbox::run_mailbox(actor, ctx, rx, stop.clone(), done_tx));

// Create a stop signal that sets the flag AND sends a no-op envelope
// to wake up the mailbox if it's blocked on recv().
// to wake up the mailbox if it's blocked on recv(). If the mailbox
// is full the noop is dropped, but the stop flag is still set — the
// actor will notice it on the next message or idle cycle.
let signal_stop = {
let stop = stop.clone();
let tx = tx;
Expand Down
3 changes: 2 additions & 1 deletion crates/client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,8 @@ pub mod event_receiver {
broker: &Addr<Broker<ClientEvent>>,
system: &willow_actor::SystemHandle,
) -> Self {
let (tx, rx) = willow_actor::runtime::unbounded_channel();
let (tx, rx) =
willow_actor::runtime::channel(willow_actor::runtime::DEFAULT_MAILBOX_CAPACITY);
let addr = system.spawn(ForwarderActor { tx });
let recipient = addr.into();
let _ = broker.ask(BrokerSubscribe(recipient)).await;
Expand Down
8 changes: 4 additions & 4 deletions crates/web/src/components/member_list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ pub fn MemberList(
.iter()
.filter(|(pid, _, _)| {
sync_providers.contains(pid)
&& pid != &peer_id.get_untracked()
&& pid != &peer_id.get()
&& *pid != owner_str
})
.cloned()
Expand Down Expand Up @@ -121,7 +121,7 @@ pub fn MemberList(
.filter(|(pid, _, _)| {
// Exclude workers from the members section.
!sync_providers.contains(pid)
|| pid == &peer_id.get_untracked()
|| pid == &peer_id.get()
|| *pid == owner_str
})
.collect::<Vec<_>>()
Expand Down Expand Up @@ -175,7 +175,7 @@ pub fn MemberList(
let hu = handle_untrust.clone();
let pk = pid_kick.clone();
move || {
let is_admin = app_state.server.admin_ids.get().contains(&peer_id.get_untracked());
let is_admin = app_state.server.admin_ids.get().contains(&peer_id.get());
if is_self() || !is_admin {
None
} else {
Expand Down Expand Up @@ -224,7 +224,7 @@ pub fn MemberList(
let sync_providers = app_state.server.sync_provider_ids.get();
let non_worker_count = all.iter().filter(|(pid, _, _)| {
!sync_providers.contains(pid)
|| pid == &peer_id.get_untracked()
|| pid == &peer_id.get()
|| *pid == owner_str
}).count();
if non_worker_count == 0 {
Expand Down
Loading
Loading