Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
8f8a125
fix(messaging): replace O(N²) sort-on-insert with BTreeMap channel in…
intendednull Apr 12, 2026
7bac12e
network: remove ConnectionEvent / connection_events stub (#119)
intendednull Apr 12, 2026
ab78113
state: O(1) message lookup via index (#122) and log buffer evictions …
intendednull Apr 12, 2026
344b12b
crypto: add RatchetCache to eliminate O(counter) replay on decrypt
intendednull Apr 12, 2026
f7ebf1e
fix(client): add actor timeouts (#130) and safe UUID parsing in topic…
intendednull Apr 12, 2026
c547e72
worker: sign WorkerWireMessage with Ed25519 (#117)
intendednull Apr 12, 2026
afdf3ab
web: replace browser-API .unwrap() with graceful error handling (#128…
intendednull Apr 12, 2026
5fc1320
Merge fix/108/state: O(1) message index (#122) and log evictions (#123)
intendednull Apr 12, 2026
e1b249f
Merge fix/108/crypto: RatchetCache to avoid O(counter) replay (#120)
intendednull Apr 12, 2026
6e98dbf
Merge fix/108/messaging: BTreeMap channel index (#121)
intendednull Apr 12, 2026
401d362
Merge fix/108/network: remove connection_events placeholder (#119)
intendednull Apr 12, 2026
8e4610a
Merge fix/108/worker: sign WorkerWireMessage with Ed25519 (#117)
intendednull Apr 12, 2026
31e864b
Merge fix/108/web: replace browser-API unwraps (#128, #129)
intendednull Apr 12, 2026
82f8807
Merge fix/108/client: actor timeouts (#130) and safe UUID parsing (#141)
intendednull Apr 12, 2026
ca2f804
lint: enable clippy::let_underscore_must_use workspace-wide (#131)
intendednull Apr 12, 2026
19c42a4
Merge fix/108/ci: enable clippy::let_underscore_must_use workspace-wi…
intendednull Apr 12, 2026
7925191
docs(common): update WorkerWireMessage security note to reflect Ed255…
intendednull Apr 12, 2026
061028b
fix: remaining .unwrap() in voice.rs and unused label on WASM
intendednull Apr 12, 2026
b503a83
fix: performance audit findings — HashMap index, ratchet guard, warn …
intendednull Apr 12, 2026
64396df
test: add missing tests flagged by coverage audit
intendednull Apr 12, 2026
3a6c3c8
style: cargo fmt
intendednull Apr 12, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,3 +40,6 @@ tracing-subscriber = { version = "0.3", features = ["env-filter"] }

# Synchronization primitives (poison-free, WASM-compatible)
parking_lot = "0.12"

[workspace.lints.clippy]
let_underscore_must_use = "deny"
2 changes: 1 addition & 1 deletion crates/actor/src/addr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ impl<A: Actor> Addr<A> {
let envelope = envelope::envelope_send(msg);
// Channel could close between our check and send — that's fine,
// the message is lost (same as any async channel race).
let _ = self.tx.send(envelope);
self.tx.send(envelope).ok();
Ok(())
}

Expand Down
4 changes: 2 additions & 2 deletions crates/actor/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ impl<A: Actor> Context<A> {
}
None => {
let envelope = envelope::envelope_stream_finished::<A, S>();
let _ = tx.send(envelope);
tx.send(envelope).ok();
break;
}
}
Expand Down Expand Up @@ -138,7 +138,7 @@ impl<A: Actor> Context<A> {
}

let envelope = envelope::envelope_send::<A, M>(msg);
let _ = tx.send(envelope);
tx.send(envelope).ok();
});

TimerHandle { cancelled }
Expand Down
6 changes: 3 additions & 3 deletions crates/actor/src/debounce.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ impl<M: Message<Result = ()> + Send + 'static> Handler<Enqueue<M>> for Debounce<
impl<M: Message<Result = ()> + Send + 'static> Handler<Flush> for Debounce<M> {
fn handle(&mut self, _msg: Flush, _ctx: &mut Context<Self>) -> impl Future<Output = ()> + Send {
if let Some(pending) = self.pending.take() {
let _ = self.target.do_send(pending);
self.target.do_send(pending).ok();
}
async {}
}
Expand Down Expand Up @@ -120,7 +120,7 @@ impl<M: Message<Result = ()> + Send + 'static> Handler<Enqueue<M>> for Throttle<
ctx: &mut Context<Self>,
) -> impl Future<Output = ()> + Send {
if !self.cooling_down {
let _ = self.target.do_send(msg.0);
self.target.do_send(msg.0).ok();
self.cooling_down = true;
self._timer = Some(ctx.run_after(self.interval, CooldownExpired));
} else {
Expand All @@ -139,7 +139,7 @@ impl<M: Message<Result = ()> + Send + 'static> Handler<CooldownExpired> for Thro
self.cooling_down = false;
self._timer = None;
if let Some(pending) = self.pending.take() {
let _ = self.target.do_send(pending);
self.target.do_send(pending).ok();
self.cooling_down = true;
self._timer = Some(ctx.run_after(self.interval, CooldownExpired));
}
Expand Down
6 changes: 3 additions & 3 deletions crates/actor/src/derived.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ impl<Src: DeriveSource, T: PartialEq + Send + Sync + 'static> Actor for DerivedA
crate::runtime::spawn(async move {
let snapshot = sources.snapshot().await;
let value = selector(&snapshot);
let _ = addr.do_send(UpdateCache(value));
addr.do_send(UpdateCache(value)).ok();
});

async {}
Expand All @@ -151,7 +151,7 @@ impl<Src: DeriveSource, T: PartialEq + Send + Sync + 'static> Handler<Notify>
crate::runtime::spawn(async move {
let snapshot = sources.snapshot().await;
let value = selector(&snapshot);
let _ = addr.do_send(UpdateCache(value));
addr.do_send(UpdateCache(value)).ok();
});
async {}
}
Expand Down Expand Up @@ -241,7 +241,7 @@ impl<Src: DeriveSource, T: PartialEq + Send + Sync + 'static> From<&Addr<Derived

StateRef::new(
Arc::new(move |recipient| {
let _ = addr_sub.do_send(Subscribe(recipient));
addr_sub.do_send(Subscribe(recipient)).ok();
}),
Arc::new(move || {
let addr = addr_get.clone();
Expand Down
2 changes: 1 addition & 1 deletion crates/actor/src/envelope.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ where
Box::new(move |actor: &mut A, ctx: &mut Context<A>| {
Box::pin(async move {
let result = actor.handle(msg, ctx).await;
let _ = reply_tx.send(result);
reply_tx.send(result).ok();
})
})
}
Expand Down
2 changes: 1 addition & 1 deletion crates/actor/src/fsm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ impl<M: StateMachine> From<&Addr<FsmActor<M>>> for StateRef<M::State> {

StateRef::new(
Arc::new(move |recipient| {
let _ = addr_sub.do_send(Subscribe(recipient));
addr_sub.do_send(Subscribe(recipient)).ok();
}),
Arc::new(move || {
let addr = addr_get.clone();
Expand Down
2 changes: 1 addition & 1 deletion crates/actor/src/mailbox.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,5 +57,5 @@ pub async fn run_mailbox<A: Actor>(

actor.stopped().await;
trace!("actor stopped");
let _ = done.send(());
done.send(()).ok();
}
2 changes: 1 addition & 1 deletion crates/actor/src/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ impl<T: Send + 'static> OneshotTx<T> {
// futures_channel::oneshot::Sender::send returns Err(T) on failure,
// but we need Err(T) for our API. The cancellation error doesn't
// carry the value, so we reconstruct it.
self.0.send(val).map_err(|e| e)
self.0.send(val)
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions crates/actor/src/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ where
A: Handler<Notify>,
{
let recipient: Recipient<Notify> = subscriber.into();
let _ = state.do_send(Subscribe(recipient));
state.do_send(Subscribe(recipient)).ok();
}

// ───── StateRef ───────────────────────────────────────────────────────────
Expand Down Expand Up @@ -285,7 +285,7 @@ impl<S: Send + Sync + 'static> From<&Addr<StateActor<S>>> for StateRef<S> {

Self {
subscribe_fn: Arc::new(move |recipient| {
let _ = addr_sub.do_send(Subscribe(recipient));
addr_sub.do_send(Subscribe(recipient)).ok();
}),
get_fn: Arc::new(move || {
let addr = addr_get.clone();
Expand Down
2 changes: 1 addition & 1 deletion crates/actor/src/supervisor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,5 +124,5 @@ async fn run_mailbox_inline<A: Actor>(
}

actor.stopped().await;
let _ = done.send(());
done.send(()).ok();
}
14 changes: 8 additions & 6 deletions crates/actor/src/system.rs
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ impl System {
};

for rx in done_rxs {
let _ = rx.recv().await;
rx.recv().await.ok();
}
}
}
Expand Down Expand Up @@ -202,17 +202,19 @@ impl SystemHandle {
Box::new(move || {
stop.store(true, Ordering::SeqCst);
let noop: BoxEnvelope<A> = Box::new(|_actor, _ctx| Box::pin(async {}));
let _ = tx.send(noop);
tx.send(noop).ok();
}) as Box<dyn Fn() + Send>
};

// Register for shutdown tracking (fire-and-forget).
// FIFO ordering guarantees this is processed before any
// subsequent Shutdown message.
let _ = self.system_addr.do_send(Register(ActorEntry {
signal_stop,
done_rx: Some(done_rx),
}));
self.system_addr
.do_send(Register(ActorEntry {
signal_stop,
done_rx: Some(done_rx),
}))
.ok();

addr
}
Expand Down
2 changes: 1 addition & 1 deletion crates/agent/src/tools.rs
Original file line number Diff line number Diff line change
Expand Up @@ -595,7 +595,7 @@ impl<N: Network> WillowToolRouter<N> {
let p: AuthorizeWorkersParams = parse_args(&args)?;
let eids = parse_endpoint_ids(&p.worker_peer_ids)
.map_err(|e| ErrorData::invalid_params(e, None))?;
let _ = self.client.authorize_workers(&eids).await;
self.client.authorize_workers(&eids).await.ok();
success_json(serde_json::json!({"success": true}))
}

Expand Down
45 changes: 34 additions & 11 deletions crates/client/src/accessors.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use super::*;
use crate::util;

impl<N: willow_network::Network> ClientHandle<N> {
pub fn identity(&self) -> Identity {
Expand Down Expand Up @@ -67,18 +68,28 @@ impl<N: willow_network::Network> ClientHandle<N> {

pub async fn event_messages(&self, channel_id: &str) -> Vec<willow_state::ChatMessage> {
let cid = channel_id.to_string();
willow_actor::state::select(&self.event_state_addr, move |es| {
es.messages
.iter()
.filter(|m| m.channel_id == cid && !m.deleted)
.cloned()
.collect()
let addr = self.event_state_addr.clone();
util::with_timeout("event_messages", async move {
willow_actor::state::select(&addr, move |es| {
es.messages
.iter()
.filter(|m| m.channel_id == cid && !m.deleted)
.cloned()
.collect()
})
.await
})
.await
.unwrap_or_default()
}

pub async fn peers(&self) -> Vec<willow_identity::EndpointId> {
willow_actor::state::select(&self.chat_meta_addr, |c| c.peers.clone()).await
let addr = self.chat_meta_addr.clone();
util::with_timeout("peers", async move {
willow_actor::state::select(&addr, |c| c.peers.clone()).await
})
.await
.unwrap_or_default()
}

pub async fn server_members(&self) -> Vec<(willow_identity::EndpointId, String, bool)> {
Expand All @@ -94,7 +105,12 @@ impl<N: willow_network::Network> ClientHandle<N> {
}

pub async fn is_connected(&self) -> bool {
willow_actor::state::select(&self.network_meta_addr, |n| n.connected).await
let addr = self.network_meta_addr.clone();
util::with_timeout("is_connected", async move {
willow_actor::state::select(&addr, |n| n.connected).await
})
.await
.unwrap_or(false)
}

pub async fn has_permission(
Expand All @@ -104,10 +120,12 @@ impl<N: willow_network::Network> ClientHandle<N> {
) -> bool {
let pid = *peer_id;
let p = *perm;
willow_actor::state::select(&self.event_state_addr, move |es| {
es.has_permission(&pid, &p)
let addr = self.event_state_addr.clone();
util::with_timeout("has_permission", async move {
willow_actor::state::select(&addr, move |es| es.has_permission(&pid, &p)).await
})
.await
.unwrap_or(false)
}

pub async fn roles_data(&self) -> Vec<(String, String, Vec<String>)> {
Expand All @@ -127,7 +145,12 @@ impl<N: willow_network::Network> ClientHandle<N> {
/// Check if a peer is an admin.
pub async fn is_admin(&self, peer_id: &willow_identity::EndpointId) -> bool {
let pid = *peer_id;
willow_actor::state::select(&self.event_state_addr, move |es| es.is_admin(&pid)).await
let addr = self.event_state_addr.clone();
util::with_timeout("is_admin", async move {
willow_actor::state::select(&addr, move |es| es.is_admin(&pid)).await
})
.await
.unwrap_or(false)
}

/// Get the set of admin EndpointIds.
Expand Down
6 changes: 3 additions & 3 deletions crates/client/src/joining.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,11 +143,11 @@ impl<N: willow_network::Network> ClientHandle<N> {
}

// Open event store on persistence actor.
let _ = self
.persistence_addr
self.persistence_addr
.do_send(persistence_actor::OpenEventStore {
server_id: accepted.server_id.clone(),
});
})
.ok();

// Request sync.
let msg = ops::WireMessage::SyncRequest {
Expand Down
Loading
Loading