diff --git a/payjoin-directory/src/db/files.rs b/payjoin-directory/src/db/files.rs index 3552827ad..4ac022c39 100644 --- a/payjoin-directory/src/db/files.rs +++ b/payjoin-directory/src/db/files.rs @@ -37,7 +37,9 @@ struct V2WaitMapEntry { #[derive(Debug)] struct V1WaitMapEntry { - payload: Arc>, + /// The V1 payload. `take()`n after the first read for data minimization — + /// plaintext PSBTs should not linger in memory longer than needed. + payload: Option>>, sender: oneshot::Sender>, } @@ -325,9 +327,12 @@ impl DbTrait for Db { impl Mailboxes { async fn read(&mut self, id: &ShortId) -> io::Result>>> { // V1 POST requests are only stored in memory since they are - // unencrypted. Check this hash table first. - if let Some(V1WaitMapEntry { payload, .. }) = self.pending_v1.get(id) { - return Ok(Some(payload.clone())); + // unencrypted. Check this hash table first. Use take() for data + // minimization — clear the plaintext payload after first read. + if let Some(entry) = self.pending_v1.get_mut(id) { + if let Some(payload) = entry.payload.take() { + return Ok(Some(payload)); + } } // V2 requests are stored on disk @@ -358,8 +363,11 @@ impl Mailboxes { return Err(Error::OverCapacity); } - if self.pending_v1.contains_key(id) { - return Err(Error::OverCapacity); + if let Some(entry) = self.pending_v1.get(id) { + if entry.payload.is_some() { + return Err(Error::OverCapacity); + } + return Err(Error::AlreadyRead); } let receiver = self @@ -419,13 +427,17 @@ impl Mailboxes { let payload = payload.clone(); let (sender, receiver) = oneshot::channel::>(); ret = Some(receiver); - V1WaitMapEntry { payload, sender } + V1WaitMapEntry { payload: Some(payload), sender } }); - // If there are pending readers, satisfy them and mark the payload as read + // If there are pending readers, satisfy them with the payload + // and clear the in-memory copy for data minimization if let Some(pending) = self.pending_v2.remove(id) { trace!("notifying pending readers for {} (v1 fallback)", id); - pending.sender.send(payload).expect("sending on oneshot channel must succeed"); + pending.sender.send(payload.clone()).expect("sending on oneshot channel must succeed"); + if let Some(entry) = self.pending_v1.get_mut(id) { + entry.payload.take(); + } } Ok(ret) @@ -568,6 +580,9 @@ pub enum Error { /// Operation rejected due to lack of capacity OverCapacity, + /// Indicates receiver already consumed the plaintext V1 request payload + AlreadyRead, + /// Indicates the sender that was waiting for the reply is no longer there V1SenderUnavailable, @@ -584,6 +599,7 @@ impl From for super::Error { match val { Error::V1SenderUnavailable => super::Error::V1SenderUnavailable, Error::OverCapacity => super::Error::OverCapacity, + Error::AlreadyRead => super::Error::AlreadyRead, Error::IO(e) => super::Error::Operational(e), } } @@ -603,6 +619,7 @@ impl std::fmt::Display for Error { use Error::*; match self { OverCapacity => "Database over capacity".fmt(f), + AlreadyRead => "Mailbox payload already read".fmt(f), V1SenderUnavailable => "Sender no longer connected".fmt(f), IO(e) => write!(f, "Internal Error: {e}"), } @@ -780,7 +797,7 @@ async fn test_v2_wait() -> std::io::Result<()> { match db.wait_for_v2_payload(&id).await { Err(super::Error::Timeout(_)) => {} - res => panic!("expected timeout, got {:?}", res), + res => panic!("expected timeout, got {res:?}"), } let read_task1 = tokio::spawn({ @@ -870,6 +887,59 @@ async fn test_v1_wait() -> std::io::Result<()> { Ok(()) } +#[tokio::test] +async fn test_v1_data_minimization() -> std::io::Result<()> { + let dir = tempfile::tempdir()?; + + let db = Arc::new( + Db::init(Duration::from_millis(500), dir.path().to_owned()) + .await + .expect("initializing mailbox database should succeed"), + ); + + let id = ShortId([0u8; 8]); + + // Spawn v1 sender in background + let v1_sender_task = tokio::spawn({ + let db = db.clone(); + async move { db.post_v1_request_and_wait_for_response(&id, b"request".to_vec()).await } + }); + + // Small delay to let v1 request post + tokio::time::sleep(Duration::from_millis(10)).await; + + // First read should return the payload + let res = db.wait_for_v2_payload(&id).await.expect("first read should succeed"); + assert_eq!(&res[..], b"request", "first read should return the payload"); + + // Subsequent reads should not return the plaintext payload again. + assert!( + matches!(db.wait_for_v2_payload(&id).await, Err(super::Error::AlreadyRead)), + "subsequent reads should indicate the payload was already consumed" + ); + + // Verify the payload was cleared from memory by checking directly + { + let guard = db.mailboxes.lock().await; + let entry = guard.pending_v1.get(&id); + assert!( + entry.is_none_or(|e| e.payload.is_none()), + "v1 payload should have been cleared after first read" + ); + } + + // V1 response flow should still work even after payload was cleared + db.post_v1_response(&id, b"response".to_vec()).await.expect("posting response should succeed"); + + let res = v1_sender_task + .await + .expect("joining task should succeed") + .expect("v1 sender should get response"); + assert_eq!(&res[..], b"response", "v1 sender should receive the response"); + + Ok(()) +} + // Simulate elapsed time deterministically by shifting stored timestamps // backward instead of sleeping. tokio::time::pause() can't be used because // prune compares against SystemTime (timestamps originate from disk). diff --git a/payjoin-directory/src/db/mod.rs b/payjoin-directory/src/db/mod.rs index b4970a532..4adfdb296 100644 --- a/payjoin-directory/src/db/mod.rs +++ b/payjoin-directory/src/db/mod.rs @@ -16,6 +16,7 @@ pub enum Error { Operational(OperationalError), Timeout(tokio::time::error::Elapsed), OverCapacity, + AlreadyRead, V1SenderUnavailable, } @@ -33,6 +34,7 @@ impl std::fmt::Display for Error { Operational(error) => write!(f, "Db error: {error}"), Timeout(timeout) => write!(f, "Timeout: {timeout}"), OverCapacity => "Database over capacity".fmt(f), + AlreadyRead => "Mailbox payload already read".fmt(f), V1SenderUnavailable => "Sender no longer connected".fmt(f), } } diff --git a/payjoin-directory/src/lib.rs b/payjoin-directory/src/lib.rs index 97b7eea43..fcb783a3b 100644 --- a/payjoin-directory/src/lib.rs +++ b/payjoin-directory/src/lib.rs @@ -453,6 +453,7 @@ fn handle_peek( db::Error::OverCapacity => Err(HandlerError::ServiceUnavailable(anyhow::Error::msg( "mailbox storage at capacity", ))), + db::Error::AlreadyRead => Ok(timeout_response), db::Error::V1SenderUnavailable => Err(HandlerError::SenderGone(anyhow::Error::msg( "Sender is unavailable try a new request", ))),