Skip to content

Replace redis with file based mailbox storage#915

Merged
spacebear21 merged 5 commits intopayjoin:masterfrom
nothingmuch:yeet-redis
Sep 19, 2025
Merged

Replace redis with file based mailbox storage#915
spacebear21 merged 5 commits intopayjoin:masterfrom
nothingmuch:yeet-redis

Conversation

@nothingmuch
Copy link
Copy Markdown
Contributor

@nothingmuch nothingmuch commented Aug 2, 2025

This PR first makes redis optional, and then removes it entirely, due to various issues with testcontainers and with the complexity of deploying a directory. If we ever want to restore redis support then the feature gating commits should make this a bit easier.

This means the directory can no longer be "horizontally scaled", i.e. it's got stateful in memory structures that were previously done using redis's pubsub. This is not an issue because of implicit bounds on throughput (no more than 25 writes/second even with very generous bounds).

Depends on #914

closes #419

@nothingmuch
Copy link
Copy Markdown
Contributor Author

nothingmuch commented Aug 3, 2025

I spoke with Dan about this and whether or not we should remove redis entirely or keep it feature gated. We both lean towards removing entirely.

My reasoning is that the main benefit of an external database server is the ability to have multiple stateless directory instances. However, it doesn't really make sense to scale this horizontally since the directory's request (honest) throughput is implicitly bound in relation to bitcoin's throughput, on the order of tens of requests per second, so there is no scenario where even the most modest hardware can't handle this with one directory instance or where the database wouldn't itself be the bottleneck.

Anyway I will be rebasing this PR on top of my changes to utilize the config crate in the directory, consistent with the way payjoin-cli uses it, and implementing the plain files on disk approach to persistence since that seems to be the simplest. I will keep the feature gating commit in the history because it might be useful to revert the removal at a later point if we do find a good reason to support redis.

@coveralls
Copy link
Copy Markdown
Collaborator

coveralls commented Aug 4, 2025

Pull Request Test Coverage Report for Build 17861723379

Details

  • 532 of 648 (82.1%) changed or added relevant lines in 6 files are covered.
  • 3 unchanged lines in 3 files lost coverage.
  • Overall coverage increased (+0.1%) to 84.724%

Changes Missing Coverage Covered Lines Changed/Added Lines %
payjoin-test-utils/src/lib.rs 6 7 85.71%
payjoin-directory/src/config.rs 0 3 0.0%
payjoin-directory/src/main.rs 0 3 0.0%
payjoin-directory/src/db/mod.rs 0 14 0.0%
payjoin-directory/src/lib.rs 12 41 29.27%
payjoin-directory/src/db/files.rs 514 580 88.62%
Files with Coverage Reduction New Missed Lines %
payjoin-directory/src/config.rs 1 0.0%
payjoin-directory/src/lib.rs 1 59.74%
payjoin-directory/src/main.rs 1 0.0%
Totals Coverage Status
Change from base Build 17861715558: 0.1%
Covered Lines: 8547
Relevant Lines: 10088

💛 - Coveralls

@nothingmuch nothingmuch mentioned this pull request Aug 7, 2025
29 tasks
This was referenced Aug 20, 2025
@nothingmuch nothingmuch force-pushed the yeet-redis branch 3 times, most recently from b3219d8 to 88d5b8b Compare August 23, 2025 02:32
@benalleng
Copy link
Copy Markdown
Collaborator

benalleng commented Aug 26, 2025

I created 2 tests to cover the mutants for these.
First a quick helper function for mock errors

#src/core/receive/v2/mod.rs
    pub(crate) fn mock_err() -> (String, JsonReply) {
        let noop_persister = NoopSessionPersister::default();
        let receiver = Receiver { state: unchecked_proposal_v2_from_test_vector() };
        let server_error = || {
            receiver
                .clone()
                .check_broadcast_suitability(None, |_| Err("mock error".into()))
                .save(&noop_persister)
        };

        let error = server_error().expect_err("Server error should be populated with mock error");
        let res = error.api_error().expect("check_broadcast error should propagate to api error");
        let actual_json = JsonReply::from(&res);
        (res.to_string(), actual_json)
    }
#src/core/receive/v2/session.rs
    #[test]
    fn test_skipped_session_extract_err_request() -> Result<(), BoxError> {
        let ohttp_relay = EXAMPLE_URL.clone();
        let mock_err = mock_err();

        let session_history = SessionHistory { events: vec![SessionEvent::MaybeInputsOwned()] };
        let err_req = session_history.extract_err_req(&ohttp_relay)?;
        assert!(err_req.is_none());

        let session_history = SessionHistory {
            events: vec![
                SessionEvent::MaybeInputsOwned(),
                SessionEvent::SessionInvalid(mock_err.0.clone(), Some(mock_err.1.clone())),
            ],
        };

        let err_req = session_history.extract_err_req(&ohttp_relay)?;
        assert!(err_req.is_none());

        let session_history = SessionHistory {
            events: vec![
                SessionEvent::Created(SHARED_CONTEXT.clone()),
                SessionEvent::MaybeInputsOwned(),
                SessionEvent::SessionInvalid(mock_err.0.clone(), Some(mock_err.1.clone())),
            ],
        };

        let err_req = session_history.extract_err_req(&ohttp_relay)?;
        assert!(err_req.is_none());
        Ok(())
    }

    #[test]
    fn test_session_extract_err_req_reply_key() -> Result<(), BoxError> {
        let proposal = proposal_from_test_vector();
        let ohttp_relay = EXAMPLE_URL.clone();
        let mock_err = mock_err();

        let session_history_one = SessionHistory {
            events: vec![
                SessionEvent::Created(SHARED_CONTEXT.clone()),
                SessionEvent::UncheckedOriginalPsbt((
                    proposal.clone(),
                    Some(crate::HpkeKeyPair::gen_keypair().1),
                )),
                SessionEvent::SessionInvalid(mock_err.0.clone(), Some(mock_err.1.clone())),
            ],
        };

        let err_req_one = session_history_one.extract_err_req(&ohttp_relay)?;
        assert!(err_req_one.is_some());

        let session_history_two = SessionHistory {
            events: vec![
                SessionEvent::Created(SHARED_CONTEXT.clone()),
                SessionEvent::UncheckedOriginalPsbt((
                    proposal.clone(),
                    Some(crate::HpkeKeyPair::gen_keypair().1),
                )),
                SessionEvent::SessionInvalid(mock_err.0, Some(mock_err.1)),
            ],
        };

        let err_req_two = session_history_two.extract_err_req(ohttp_relay)?;
        assert!(err_req_two.is_some());
        assert_ne!(
            session_history_one.session_context().unwrap().reply_key,
            session_history_two.session_context().unwrap().reply_key
        );

        Ok(())
    }

This function can be simplified with that above helper function.

#[test]
fn test_extract_err_req() -> Result<(), BoxError> {
let noop_persister = NoopSessionPersister::default();
let receiver = Receiver { state: unchecked_proposal_v2_from_test_vector() };
let server_error = || {
receiver
.clone()
.check_broadcast_suitability(None, |_| Err("mock error".into()))
.save(&noop_persister)
};
let expected_json = serde_json::json!({
"errorCode": "unavailable",
"message": "Receiver error"
});
let error = server_error().expect_err("Server error should be populated with mock error");
let res = error.api_error().expect("check_broadcast error should propagate to api error");
let actual_json = JsonReply::from(&res);
assert_eq!(actual_json.to_json(), expected_json);
let (_req, _ctx) = extract_err_req(&actual_json, &*EXAMPLE_URL, &SHARED_CONTEXT)?;
let internal_error: ReplyableError = InternalPayloadError::MissingPayment.into();
let (_req, _ctx) =
extract_err_req(&(&internal_error).into(), &*EXAMPLE_URL, &SHARED_CONTEXT)?;
Ok(())
}

#src/core/receive/v2/mod.rs
    #[test]
    fn test_extract_err_req() -> Result<(), BoxError> {
        let receiver = Receiver { state: unchecked_proposal_v2_from_test_vector() };
        let mock_err = mock_err();
        let expected_json = serde_json::json!({
            "errorCode": "unavailable",
            "message": "Receiver error"
        });

        assert_eq!(mock_err.1.to_json(), expected_json);

        let (_req, _ctx) = extract_err_req(&mock_err.1, &*EXAMPLE_URL, &receiver.session_context)?;

        let internal_error: ReplyableError = InternalPayloadError::MissingPayment.into();
        let (_req, _ctx) =
            extract_err_req(&(&internal_error).into(), &*EXAMPLE_URL, &receiver.session_context)?;
        Ok(())
    }

@nothingmuch
Copy link
Copy Markdown
Contributor Author

perfect timing i was just about to do this myself good thing i checked my phone first... thanks so much!

@nothingmuch nothingmuch changed the title Yeet redis Replace redis with file based mailbox storage Aug 29, 2025
@nothingmuch nothingmuch force-pushed the yeet-redis branch 2 times, most recently from 8f70824 to 9a2dbc4 Compare August 29, 2025 17:47
@nothingmuch nothingmuch marked this pull request as ready for review August 29, 2025 18:15
Copy link
Copy Markdown
Collaborator

@spacebear21 spacebear21 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

code ACK, thank you for putting so much thought into this, this PR was a pleasure to read.

I'm not sure what is a good way to test this. How would you suggest validating this change before we deploy it to the production server?

///
/// Defaults to around 2e6, for a generous upper bound rounded up from ~2
/// mailboxes/tx, ~4K txs/block, and ~144 blocks/24h.
const DEFAULT_CAPACITY: usize = 1 << (1 + 12 + 8);
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's unclear to me from the docstring what the special meaning of 1 + 12 + 8 is?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's just a power of two that's a loose bound on the estimate in the comment, $2 \cdot 4000 \cdot 144 &lt; 2 \cdot 4096 \cdot 256 = 2^{1 + 12 + 8}$

Comment on lines +75 to +86
// XOR data with a random pattern to obfuscate v1 requests
// and writing malicious data such as virus fingerprints
let xor: Vec<u8>;
let xor_file = dir.join("xor.dat");
if fs::try_exists(&xor_file).await? {
xor = fs::read(xor_file).await?;
} else {
xor = OsRng.next_u64().to_ne_bytes().to_vec();
let mut file = fs::File::create_new(xor_file).await?;
file.write_all(&xor).await?;
file.sync_all().await?;
}
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seemingly contradicts the commit message, which states that "v1 requests and responses are not [saved to disk]". Is XORing still useful if we are only writing encrypted v2 requests to disk?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good catch, the comment predates that change, before I realized there's no point in storing the sender's original PSBT either since if the sender is gone there's no point in replying, i'll update the comment

honestly produced v2 requests are encrypted, but the directory has no way of verifying that, so a malicious client might get a directory to store something illegal on its drive, or even just something that triggers an antivirus program and causes disruption to the service, so XOR obfuscation makes sense for v2 as well

Comment thread payjoin-directory/src/db/files.rs Outdated
Comment on lines +946 to +957
// FIXME why does this fail?
// db.prune().await.expect("pruning should not fail");
// assert_eq!(db.mailboxes.lock().await.len(), 1);
// mailbox seems to get pruned prematurely
// likely cause is it's in both read and insert queue, and two pruning runs
// are needed to fully clear it?

// mark the mailbox as read
_ = db.wait_for_v2_payload(&id).await.expect("waiting for payload should succeed");

assert_eq!(db.mailboxes.lock().await.len(), 1);
// FIXME db.prune().await.expect("pruning should not fail");
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I noticed a few FIXME comments leftover here and in other places, do you plan on addressing these here or defer as follow-ups?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i'm undecided, there's some hassles with these tests because this uses SystemTime, not tokio::time::Instant (so therefore mocking time is trickier) causing these commented out assertions to be a bit flakey...

there's no guarantee that filesystem timestamps are monotonically increasing, so if the system clock is readjusted this can cause some issues as well such as nominal creation times in the future, and time arithmetic can fail. i went back and forth between the two time representations and kinda settled on the current implementation but i don't like the fact that it can't be reliably tested, nor am i 100% convinced that the pruning behavior is entirely correct. the consequences are not very serious if there is a bug so i'm happy handling in a followup.

one thing i considered but ultimately decided against, without much conviction, is defining a new Error type that's basically enum { IO(std::io::Error), Time(SystemTime::Error) } to account for the dependence on the clock being correct, but this seems like it leaked an implementation detail and there isn't much that could be done to handle that error

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems appropriate to follow up with in or after #1047?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no that PR only affects the payjoin crate, and this only concerns the directory, but also SystemTime dependence is inherent since that's what the type of timestamp files on disk have

@nothingmuch
Copy link
Copy Markdown
Contributor Author

code ACK, thank you for putting so much thought into this, this PR was a pleasure to read.

thanks for the kind words! makes me happy to read that

I'm not sure what is a good way to test this. How would you suggest validating this change before we deploy it to the production server?

the tests all pass with this (after #981, before that there were a few tests that dependended on overwriting behavior). i also tested manually quite a bit.

something that we could consider is to have a Db impl that multiplexes to two backends and checks that they agree, to see that we don't regress from redis behavior but for the basic store & fetch behavior that's kinda trivial, expiry is what makes more of a difference, and that was never really implemented for redis...

i would be interested to explore fuzzing options, that's something i really don't have any experience with but have some understanding of how it works, it seems well suited for this

and another approach would be property testing

@benalleng
Copy link
Copy Markdown
Collaborator

benalleng commented Aug 30, 2025

I created a local branch that simply unlocks the integration tests on our ffi layer, it looks promising as we no longer get any of the runtime errors! 🥳

benalleng@483a793

Dart

Python

Note:

there are warnings though in the macos python test

/Library/Frameworks/Python.framework/Versions/3.12/lib/python3.12/linecache.py:52: ResourceWarning: unclosed <socket.socket fd=23, family=30, type=1, proto=6, laddr=('::1', 49235, 0, 0), raddr=('::1', 49230, 0, 0)>
  def checkcache(filename=None):
ResourceWarning: Enable tracemalloc to get the object allocation traceback
/Library/Frameworks/Python.framework/Versions/3.12/lib/python3.12/asyncio/selector_events.py:879: ResourceWarning: unclosed transport <_SelectorSocketTransport fd=23 read=idle write=<idle, bufsize=0>>
  _warn(f"unclosed transport {self!r}", ResourceWarning, source=self)
ResourceWarning: Enable tracemalloc to get the object allocation traceback

Copy link
Copy Markdown
Collaborator

@arminsabouri arminsabouri left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

utACK/codeACK

Core logic in files.rs checks out with me. I had a couple questions and nits.
Have you thought about deployment strategy? How do / should we migrate existing redis sessions?

Comment thread payjoin-directory/src/lib.rs Outdated
Comment on lines +412 to +413
db::Error::OverCapacity => todo!(), // TODO temporarily unavailable?
db::Error::V1SenderUnavailable => todo!(), // TODO gone?
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These may get resolve in a future commit but wanted to call them out incase they don't.
Either InternalServerError or 503 could make sense here.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

these need to be addressed in this PR, so on point

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

went with a 503 ServiceUnavailable Error for OverCapacity and in some discussion with yuval decided that 410 Gone was appropriate for V1SenderUnavailable as the intention is to say that the sender is gone and not coming back so the user should try again with a new request.

i'm not 100% sure 410 Gone is appropriate if the same resource would become valid in the near future

some 4xx code that indicates that the client should not retry that particular request, but GET again, and then retry a different request with payjoin proposal

410 seems appropriate, even though it initial intention is seo, in our case saying that the sender is gone and not coming back so you should try a different request seems to be covered here

Ultimately it hinges on whether we think it appropriate to expect the sender to come back in a v1 payjoin?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's appropriate to expect that, the sender can just retry and the receiver can as well (but should not just retry posting the same PUT request, but first GET the new request by the sender to ensure it's not stale)

we can just specify (in BIP 77) that in these circumstances the directory should respond with 410, and that it's OK to retry in the specific manner (if it's a fresh request resulting from another get), so essentially this is an aesthetics/bikeshedding thing, choosing the most appropriate status code to convey the specific failure condition is more of a bikeshedding thing in practice (but helps convey the intent to implementors)

alternative codes i've considered: 404, 424 (obscure) 502 or 504 could all be argued to make sense in this context

Copy link
Copy Markdown
Collaborator

@benalleng benalleng Sep 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

but should not just retry posting the same PUT request, but first GET the new request by the sender to ensure it's not stale

I think this is why 410 is appropriate as this implies the request used to get to this point should be discarded and a new GET request is needed.

Comment thread payjoin-directory/src/db/files.rs
Comment thread payjoin-directory/src/db/files.rs Outdated
// Check that there isn't there's already a v1 waiter for this ID, can't
// accept write needs to be rejected.
if self.pending_v1.contains_key(id) {
return Err(Error::OverCapacity);
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this the correct error to return?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think so, but the comment should explain why. Basically, it's correct that there's no capacity. There may be capacity in the future. This should only happen in case of a collision, in which case it makes sense to wait and try again later, if two the a v1 sender's message gets to the wrong receiver then it should be rejected because the payment address is unrecognized, however, for this to happen it'd need to happen in a 30 second window or so. Similarly, if an attacker managed to guess the short ID of a v1 session, then it's better not to directly leak that it succeeded in doing that (an over capacity error is still a statistical leak if random v2 writes do work).

Comment thread payjoin-directory/src/db/files.rs Outdated
Comment thread payjoin-directory/src/db/files.rs Outdated
debug_assert!(self.unread_ttl_at_capacity < self.unread_ttl_below_capacity);
debug_assert!(self.pending_v1.iter().all(|(_, v)| !v.sender.is_closed()));

// Prune in flight requests, these can persist in the case of a
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: uncomplete comment.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just added.
"Prune in flight requests, these can persist in the case of an incomplete session"
Not sure if this actually is appropriate here..?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

once the last v2 getter for a mailbox times out and there are no more inflight requests, the shared future is still in the table

Comment on lines +410 to +413
pending
.sender
.send(Arc::new(payload))
.expect("sending on oneshot channel must succeed");
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just want to double check my understanding. If the receiver end timesout it will clean up the waitmap entry, correct?

Copy link
Copy Markdown
Contributor Author

@nothingmuch nothingmuch Sep 2, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not on its own, that's what the .retain() line in prune does, after the last receiver times out the shared future will have a refcount of 1 since its only copy is in the waitmap, and those entries will be cleared on the next pass

Comment thread payjoin-directory/src/db/files.rs Outdated
Comment thread payjoin-directory/src/lib.rs Outdated
Comment thread payjoin-directory/src/lib.rs Outdated
nothingmuch and others added 5 commits September 19, 2025 16:48
This is a refactoring of the existing redis DbPool, simplifying it a
bit and renaming the various methods for clarity.

The trait is async, with its futures also satisfying `Send` in order to
allow its use in the `hyper::Service` trait.

DbPool was renamed to db::redis::Db (reexported as RedisDB) in
anticipation of additional impls of the trait.

Co-authored-by: benalleng <benalleng@gmail.com>
v2 payloads are saved to disk.

v1 requests and responses are not, because they only make sense to keep
around so long as the sender's connection is still active, and because
they are not encrypted.

In flight requests (long polls) are tracked waitmaps for notifying
waiting readers of new payloads or v1 responses.

Mailbox contents are retained for up to a week by default, or up to 24
hours at capacity. Read mailboxes have a grace period of 10 minutes
before they are expired.

For simplicity no concurrent hashmap is used, and the mutex is held
during writes to disk. This is because concurrency is implicitly bound
by bitcoin's transaction throughput limits, for the retention times
described above a very generous upper bound is 25 writes per second.
Because only mutual exclusion for the entire structure is used, pruning
is done on a per need basis by the locking thread and not in a
background task.

Co-authored-by: benalleng <benalleng@gmail.com>
Copy link
Copy Markdown
Collaborator

@arminsabouri arminsabouri left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

re-utACK 0b6d029

Reviewed the pushes that occured from my last ack to now.

help = "The redis host to connect to"
long = "storage-dir",
env = "PJ_STORAGE_DIR",
help = "A directory for writing mailbox data."
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: This is the sqlite database correct? if so its not only mailbox data

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no just files on disk

Copy link
Copy Markdown
Collaborator

@spacebear21 spacebear21 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

re-ACK 0b6d029 🚀

@spacebear21 spacebear21 merged commit 92a3154 into payjoin:master Sep 19, 2025
10 checks passed
@spacebear21 spacebear21 mentioned this pull request Sep 19, 2025
1 task
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Replace redis dependency?

5 participants