Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 38 additions & 13 deletions payjoin-cli/src/app/v2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@ use bitcoincore_rpc::RpcApi;
use payjoin::bitcoin::consensus::encode::serialize_hex;
use payjoin::bitcoin::psbt::Psbt;
use payjoin::bitcoin::{Amount, FeeRate};
use payjoin::receive::v2::Receiver;
use payjoin::receive::v2::{
InitialState, PayjoinProposal, ProvisionalProposal, Receiver, UncheckedProposal, WantsInputs,
};
use payjoin::send::v2::{Sender, SenderBuilder};
use payjoin::{bitcoin, Error, Uri};
use tokio::signal;
Expand Down Expand Up @@ -113,7 +115,7 @@ impl App {
#[allow(clippy::incompatible_msrv)]
async fn spawn_payjoin_receiver(
&self,
mut session: Receiver,
mut session: Receiver<InitialState>,
amount: Option<Amount>,
) -> Result<()> {
println!("Receive session established");
Expand All @@ -124,7 +126,7 @@ impl App {
println!("{}", pj_uri);

let mut interrupt = self.interrupt.clone();
let res = tokio::select! {
let receiver = tokio::select! {
res = self.long_poll_fallback(&mut session) => res,
_ = interrupt.changed() => {
println!("Interrupted. Call the `resume` command to resume all sessions.");
Expand All @@ -133,10 +135,14 @@ impl App {
}?;

println!("Fallback transaction received. Consider broadcasting this to get paid if the Payjoin fails:");
println!("{}", serialize_hex(&res.extract_tx_to_schedule_broadcast()));
let mut payjoin_proposal = self
.process_v2_proposal(res)
.map_err(|e| anyhow!("Failed to process proposal {}", e))?;
println!("{}", serialize_hex(&receiver.extract_tx_to_schedule_broadcast()));
let mut payjoin_proposal = match self.process_v2_proposal(receiver.clone()) {
Ok(proposal) => proposal,
Err(e) => {
handle_request_error(e, receiver, &self.config.ohttp_relay).await?;
unreachable!("handle_request_error always returns Err")
}
};
let (req, ohttp_ctx) = payjoin_proposal
.extract_v2_req()
.map_err(|e| anyhow!("v2 req extraction failed {}", e))?;
Expand Down Expand Up @@ -236,8 +242,8 @@ impl App {

async fn long_poll_fallback(
&self,
session: &mut payjoin::receive::v2::Receiver,
) -> Result<payjoin::receive::v2::UncheckedProposal> {
session: &mut Receiver<InitialState>,
) -> Result<Receiver<UncheckedProposal>> {
loop {
let (req, context) = session.extract_req()?;
println!("Polling receive request...");
Expand All @@ -254,8 +260,8 @@ impl App {

fn process_v2_proposal(
&self,
proposal: payjoin::receive::v2::UncheckedProposal,
) -> Result<payjoin::receive::v2::PayjoinProposal, Error> {
proposal: Receiver<UncheckedProposal>,
) -> Result<Receiver<PayjoinProposal>, Error> {
let bitcoind = self.bitcoind().map_err(|e| Error::Server(e.into()))?;

// in a payment processor where the sender could go offline, this is where you schedule to broadcast the original_tx
Expand Down Expand Up @@ -333,10 +339,29 @@ impl App {
}
}

/// Handle request error by sending an error response to the sender and processing the error response
async fn handle_request_error(
e: Error,
mut receiver: Receiver<UncheckedProposal>,
ohttp_relay: &payjoin::Url,
) -> Result<(), anyhow::Error> {
let (err_req, err_ctx) = receiver
.extract_err_req(e, ohttp_relay)
.map_err(|e| anyhow!("Failed to extract error request: {}", e))?;

let err_response = post_request(err_req).await?;

let err_bytes = err_response.bytes().await?;
receiver
.process_err_res(&err_bytes, err_ctx)
.map_err(|e| anyhow!("Failed to process error response: {}", e))?;
Err(anyhow!("Failed to process proposal"))
}

fn try_contributing_inputs(
payjoin: payjoin::receive::v2::WantsInputs,
payjoin: Receiver<WantsInputs>,
bitcoind: &bitcoincore_rpc::Client,
) -> Result<payjoin::receive::v2::ProvisionalProposal> {
) -> Result<Receiver<ProvisionalProposal>> {
let candidate_inputs = bitcoind
.list_unspent(None, None, None, None, None)
.context("Failed to list unspent from bitcoind")?
Expand Down
9 changes: 5 additions & 4 deletions payjoin-cli/src/db/v2.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
use bitcoincore_rpc::jsonrpc::serde_json;
use payjoin::receive::v2::Receiver;
use payjoin::receive::v2::{InitialState, Receiver};
use payjoin::send::v2::Sender;
use sled::{IVec, Tree};
use url::Url;

use super::*;

impl Database {
pub(crate) fn insert_recv_session(&self, session: Receiver) -> Result<()> {
pub(crate) fn insert_recv_session(&self, session: Receiver<InitialState>) -> Result<()> {
let recv_tree = self.0.open_tree("recv_sessions")?;
let key = &session.id();
let value = serde_json::to_string(&session).map_err(Error::Serialize)?;
Expand All @@ -16,12 +16,13 @@ impl Database {
Ok(())
}

pub(crate) fn get_recv_sessions(&self) -> Result<Vec<Receiver>> {
pub(crate) fn get_recv_sessions(&self) -> Result<Vec<Receiver<InitialState>>> {
let recv_tree = self.0.open_tree("recv_sessions")?;
let mut sessions = Vec::new();
for item in recv_tree.iter() {
let (_, value) = item?;
let session: Receiver = serde_json::from_slice(&value).map_err(Error::Deserialize)?;
let session: Receiver<InitialState> =
serde_json::from_slice(&value).map_err(Error::Deserialize)?;
sessions.push(session);
}
Ok(sessions)
Expand Down
13 changes: 12 additions & 1 deletion payjoin/src/receive/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,18 @@ pub enum Error {
/// To be returned as HTTP 400
BadRequest(RequestError),
// To be returned as HTTP 500
Server(Box<dyn error::Error>),
Server(Box<dyn error::Error + Send + Sync>),
}

impl Error {
pub fn to_json(&self) -> String {
match self {
Self::BadRequest(e) => e.to_string(),
Self::Server(_) =>
"{{ \"errorCode\": \"server-error\", \"message\": \"Internal server error\" }}"
.to_string(),
}
}
}

impl fmt::Display for Error {
Expand Down
5 changes: 5 additions & 0 deletions payjoin/src/receive/v2/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ pub(crate) enum InternalSessionError {
OhttpEncapsulation(OhttpEncapsulationError),
/// Unexpected response size
UnexpectedResponseSize(usize),
/// Unexpected status code
UnexpectedStatusCode(http::StatusCode),
}

impl fmt::Display for SessionError {
Expand All @@ -28,6 +30,8 @@ impl fmt::Display for SessionError {
size,
crate::ohttp::ENCAPSULATED_MESSAGE_BYTES
),
InternalSessionError::UnexpectedStatusCode(status) =>
write!(f, "Unexpected status code: {}", status),
}
}
}
Expand All @@ -38,6 +42,7 @@ impl error::Error for SessionError {
InternalSessionError::Expired(_) => None,
InternalSessionError::OhttpEncapsulation(e) => Some(e),
InternalSessionError::UnexpectedResponseSize(_) => None,
InternalSessionError::UnexpectedStatusCode(_) => None,
}
}
}
Expand Down
Loading