Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
146 changes: 137 additions & 9 deletions crates/worker/src/actors/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
//! Uses [`StreamHandler`] to receive gossip events and dispatches
//! parsed messages to the state actor via typed [`Addr`] messages.

use tracing::warn;
use willow_actor::{Actor, Addr, Context, Handler};
use willow_identity::EndpointId;
use willow_network::traits::{GossipEvent, TopicEvents};
Expand Down Expand Up @@ -149,6 +150,18 @@ impl<E: TopicEvents + 'static, T: TopicHandle + 'static> Handler<GossipEventMsg>
}
}
WorkerMessageAction::Ignore => {}
WorkerMessageAction::PeerIdMismatch {
signer,
claimed,
kind,
} => {
warn!(
signer = %signer,
claimed = %claimed,
kind,
"rejecting worker message: self-declared peer_id does not match Ed25519 signer"
);
}
WorkerMessageAction::DeserializeError(_) => {
match parse_server_message(&msg.content) {
ServerMessageAction::Events(events) => {
Expand Down Expand Up @@ -203,6 +216,20 @@ pub enum WorkerMessageAction {
},
/// No action needed (message not for us, or announcement/departure).
Ignore,
/// A self-declared `peer_id` inside the message does not match the
/// Ed25519 signer of the outer envelope. The caller should log this
/// and drop the message.
///
/// This prevents a signer from forging announcements/departures on
/// behalf of a different peer.
PeerIdMismatch {
/// The verified Ed25519 signer of the envelope.
signer: EndpointId,
/// The peer_id claimed inside the inner message.
claimed: EndpointId,
/// Which variant of `WorkerWireMessage` carried the mismatch.
kind: &'static str,
},
/// Could not deserialize the message.
DeserializeError(String),
}
Expand All @@ -211,9 +238,21 @@ pub enum WorkerMessageAction {
///
/// This is a pure function — no I/O, no channels — so it's easily
/// testable. The caller handles the actual I/O.
///
/// ## Security
///
/// Every inbound message is signature-verified by `unpack_wire`, yielding
/// a trusted [`EndpointId`] (the signer). For `WorkerWireMessage`
/// variants that carry a self-declared `peer_id`
/// (`Announcement`, `Departure`), the claimed `peer_id` must equal the
/// verified signer; otherwise the message is rejected with
/// [`WorkerMessageAction::PeerIdMismatch`]. This stops a signer from
/// forging announcements or graceful-departure notifications on behalf
/// of another peer, which would let them poison a recipient's
/// `WorkerCache` / evict legitimate workers.
pub fn parse_worker_message(data: &[u8], local_peer_id: &EndpointId) -> WorkerMessageAction {
let msg = match willow_common::unpack_wire(data) {
Some((willow_common::WireMessage::Worker(m), _signer)) => m,
let (msg, signer) = match willow_common::unpack_wire(data) {
Some((willow_common::WireMessage::Worker(m), signer)) => (m, signer),
Some(_) => return WorkerMessageAction::Ignore,
None => {
return WorkerMessageAction::DeserializeError(
Expand All @@ -240,9 +279,31 @@ pub fn parse_worker_message(data: &[u8], local_peer_id: &EndpointId) -> WorkerMe
WorkerMessageAction::Ignore
}
}
WorkerWireMessage::Response { .. }
| WorkerWireMessage::Announcement(_)
| WorkerWireMessage::Departure { .. } => WorkerMessageAction::Ignore,
WorkerWireMessage::Announcement(ann) => {
if ann.peer_id != signer {
WorkerMessageAction::PeerIdMismatch {
signer,
claimed: ann.peer_id,
kind: "Announcement",
}
} else {
// Verified — but still a no-op here; the WorkerCache
// integration consumes announcements elsewhere.
WorkerMessageAction::Ignore
}
}
WorkerWireMessage::Departure { peer_id } => {
if peer_id != signer {
WorkerMessageAction::PeerIdMismatch {
signer,
claimed: peer_id,
kind: "Departure",
}
} else {
WorkerMessageAction::Ignore
}
}
WorkerWireMessage::Response { .. } => WorkerMessageAction::Ignore,
}
}

Expand Down Expand Up @@ -355,11 +416,14 @@ mod tests {
}

#[test]
fn parse_worker_announcement_ignored() {
fn parse_worker_announcement_ignored_when_peer_id_matches_signer() {
// Happy path: signer.peer_id == announcement.peer_id → verified,
// then Ignore (no-op at the parse layer; WorkerCache handles it elsewhere).
let signer = Identity::generate();
let signer_id = signer.endpoint_id();
let my_id = gen_id();
let msg = WorkerWireMessage::Announcement(willow_common::WorkerAnnouncement {
peer_id: gen_id(),
peer_id: signer_id,
role: willow_common::WorkerRoleInfo::Replay {
servers_loaded: 1,
events_buffered: 0,
Expand All @@ -377,10 +441,47 @@ mod tests {
}

#[test]
fn parse_worker_departure_ignored() {
fn parse_worker_announcement_rejected_when_peer_id_mismatches_signer() {
// Sad path: signer.peer_id != announcement.peer_id → rejected with
// PeerIdMismatch so the caller can log and drop it. Without this
// check a signer could forge announcements for another peer and
// poison the recipient's WorkerCache.
let signer = Identity::generate();
let signer_id = signer.endpoint_id();
let impersonated = gen_id();
let my_id = gen_id();
let msg = WorkerWireMessage::Announcement(willow_common::WorkerAnnouncement {
peer_id: impersonated,
role: willow_common::WorkerRoleInfo::Replay {
servers_loaded: 1,
events_buffered: 0,
max_events: 1000,
},
servers: vec![],
timestamp: 0,
});
let data = pack_worker(msg, &signer);

match parse_worker_message(&data, &my_id) {
WorkerMessageAction::PeerIdMismatch {
signer: reported_signer,
claimed,
kind,
} => {
assert_eq!(reported_signer, signer_id);
assert_eq!(claimed, impersonated);
assert_eq!(kind, "Announcement");
}
other => panic!("expected PeerIdMismatch, got {:?}", other),
}
}

#[test]
fn parse_worker_departure_ignored_when_peer_id_matches_signer() {
let signer = Identity::generate();
let signer_id = signer.endpoint_id();
let my_id = gen_id();
let msg = WorkerWireMessage::Departure { peer_id: gen_id() };
let msg = WorkerWireMessage::Departure { peer_id: signer_id };
let data = pack_worker(msg, &signer);

assert!(matches!(
Expand All @@ -389,6 +490,33 @@ mod tests {
));
}

#[test]
fn parse_worker_departure_rejected_when_peer_id_mismatches_signer() {
// Without this check a signer could forge a Departure for another
// peer and evict them from the recipient's WorkerCache.
let signer = Identity::generate();
let signer_id = signer.endpoint_id();
let impersonated = gen_id();
let my_id = gen_id();
let msg = WorkerWireMessage::Departure {
peer_id: impersonated,
};
let data = pack_worker(msg, &signer);

match parse_worker_message(&data, &my_id) {
WorkerMessageAction::PeerIdMismatch {
signer: reported_signer,
claimed,
kind,
} => {
assert_eq!(reported_signer, signer_id);
assert_eq!(claimed, impersonated);
assert_eq!(kind, "Departure");
}
other => panic!("expected PeerIdMismatch, got {:?}", other),
}
}

#[test]
fn parse_worker_response_ignored() {
let signer = Identity::generate();
Expand Down
Loading