From b9cd076c5eae4d13a79ebaec876e6cd4ef1533ec Mon Sep 17 00:00:00 2001 From: John Cantrell Date: Tue, 16 May 2023 12:11:11 -0400 Subject: [PATCH 1/4] LSPS2: JIT Channels --- src/events.rs | 8 +- src/jit_channel/channel_manager.rs | 880 +++++++++++++++++++++++++++++ src/jit_channel/event.rs | 110 ++++ src/jit_channel/mod.rs | 6 +- src/jit_channel/msgs.rs | 252 +++++++++ src/jit_channel/scid_utils.rs | 71 +++ src/lib.rs | 6 +- src/transport/message_handler.rs | 301 +++++++++- src/transport/msgs.rs | 123 +++- src/utils.rs | 10 + 10 files changed, 1746 insertions(+), 21 deletions(-) create mode 100644 src/jit_channel/channel_manager.rs create mode 100644 src/jit_channel/event.rs create mode 100644 src/jit_channel/msgs.rs create mode 100644 src/jit_channel/scid_utils.rs diff --git a/src/events.rs b/src/events.rs index ca03e6c..620223d 100644 --- a/src/events.rs +++ b/src/events.rs @@ -13,6 +13,7 @@ //! Because we don't have a built-in runtime, it's up to the end-user to poll //! [`crate::LiquidityManager::get_and_clear_pending_events()`] to receive events. +use crate::jit_channel; use std::collections::VecDeque; use std::sync::{Condvar, Mutex}; @@ -53,6 +54,9 @@ impl EventQueue { } } -/// Event which you should probably take some action in response to. +/// An Event which you should probably take some action in response to. #[derive(Debug, Clone, PartialEq, Eq)] -pub enum Event {} +pub enum Event { + /// A LSPS2 (JIT Channel) protocol event + LSPS2(jit_channel::event::Event), +} diff --git a/src/jit_channel/channel_manager.rs b/src/jit_channel/channel_manager.rs new file mode 100644 index 0000000..e4c9d75 --- /dev/null +++ b/src/jit_channel/channel_manager.rs @@ -0,0 +1,880 @@ +// This file is Copyright its original authors, visible in version contror +// history. +// +// This file is licensed under the Apache License, Version 2.0 or the MIT license +// , at your option. +// You may not use this file except in accordance with one or both of these +// licenses. + +use std::collections::HashMap; +use std::convert::TryInto; +use std::ops::Deref; +use std::sync::{Arc, Mutex, RwLock}; + +use bitcoin::secp256k1::PublicKey; +use lightning::chain; +use lightning::chain::chaininterface::{BroadcasterInterface, FeeEstimator}; +use lightning::ln::channelmanager::{ChannelManager, InterceptId}; +use lightning::ln::msgs::{ + ChannelMessageHandler, ErrorAction, LightningError, OnionMessageHandler, RoutingMessageHandler, +}; +use lightning::ln::peer_handler::{ + APeerManager, CustomMessageHandler, PeerManager, SocketDescriptor, +}; +use lightning::routing::gossip::NetworkGraph; +use lightning::routing::router::Router; +use lightning::sign::{EntropySource, NodeSigner, SignerProvider}; +use lightning::util::errors::APIError; +use lightning::util::logger::{Level, Logger}; + +use crate::events::EventQueue; +use crate::jit_channel::msgs::Message; +use crate::jit_channel::scid_utils; +use crate::transport::message_handler::ProtocolMessageHandler; +use crate::transport::msgs::{LSPSMessage, RequestId}; +use crate::utils; +use crate::{events::Event, transport::msgs::ResponseError}; + +use super::msgs::{ + BuyRequest, BuyResponse, GetInfoRequest, GetInfoResponse, GetVersionsRequest, + GetVersionsResponse, OpeningFeeParams, RawOpeningFeeParams, Request, Response, +}; + +const SUPPORTED_SPEC_VERSION: u16 = 1; + +#[derive(PartialEq)] +enum JITChannelState { + VersionsRequested, + MenuRequested, + PendingMenuSelection, + BuyRequested, + PendingPayment, + Ready, +} + +struct JITChannel { + id: u128, + user_id: u128, + state: JITChannelState, + fees: Option, + token: Option, + min_payment_size_msat: Option, + max_payment_size_msat: Option, + payment_size_msat: Option, + counterparty_node_id: PublicKey, + amt_to_forward_msat: Option, + intercept_id: Option, + scid: Option, + lsp_cltv_expiry_delta: Option, +} + +impl JITChannel { + pub fn new( + id: u128, counterparty_node_id: PublicKey, user_id: u128, payment_size_msat: Option, + token: Option, + ) -> Self { + Self { + id, + counterparty_node_id, + token, + user_id, + state: JITChannelState::VersionsRequested, + fees: None, + min_payment_size_msat: None, + max_payment_size_msat: None, + payment_size_msat, + scid: None, + amt_to_forward_msat: None, + lsp_cltv_expiry_delta: None, + intercept_id: None, + } + } +} + +#[derive(Default)] +struct PeerState { + channels_by_id: HashMap, + request_to_cid: HashMap, + pending_requests: HashMap, +} + +impl PeerState { + pub fn insert_channel(&mut self, channel_id: u128, channel: JITChannel) { + self.channels_by_id.insert(channel_id, channel); + } + + pub fn insert_request(&mut self, request_id: RequestId, channel_id: u128) { + self.request_to_cid.insert(request_id, channel_id); + } + + pub fn get_channel_in_state_for_request( + &mut self, request_id: &RequestId, state: JITChannelState, + ) -> Option<&mut JITChannel> { + let channel_id = self.request_to_cid.remove(request_id)?; + + if let Some(channel) = self.channels_by_id.get_mut(&channel_id) { + if channel.state == state { + return Some(channel); + } + } + None + } + + pub fn remove_channel(&mut self, channel_id: u128) { + self.channels_by_id.remove(&channel_id); + } +} + +pub struct JITChannelManager< + ES: Deref, + M: Deref, + T: Deref, + F: Deref, + R: Deref, + SP: Deref, + Descriptor: SocketDescriptor, + L: Deref, + RM: Deref, + CM: Deref, + OM: Deref, + CMH: Deref, + NS: Deref, +> where + ES::Target: EntropySource, + M::Target: chain::Watch<::Signer>, + T::Target: BroadcasterInterface, + F::Target: FeeEstimator, + R::Target: Router, + SP::Target: SignerProvider, + L::Target: Logger, + RM::Target: RoutingMessageHandler, + CM::Target: ChannelMessageHandler, + OM::Target: OnionMessageHandler, + CMH::Target: CustomMessageHandler, + NS::Target: NodeSigner, +{ + entropy_source: ES, + peer_manager: Mutex>>>, + channel_manager: Arc>, + pending_messages: Arc>>, + pending_events: Arc, + per_peer_state: RwLock>>, + channels_by_scid: RwLock>, + promise_secret: [u8; 32], +} + +impl< + ES: Deref, + M: Deref, + T: Deref, + F: Deref, + R: Deref, + SP: Deref, + Descriptor: SocketDescriptor, + L: Deref, + RM: Deref, + CM: Deref, + OM: Deref, + CMH: Deref, + NS: Deref, + > JITChannelManager +where + ES::Target: EntropySource, + M::Target: chain::Watch<::Signer>, + T::Target: BroadcasterInterface, + F::Target: FeeEstimator, + R::Target: Router, + SP::Target: SignerProvider, + L::Target: Logger, + RM::Target: RoutingMessageHandler, + CM::Target: ChannelMessageHandler, + OM::Target: OnionMessageHandler, + CMH::Target: CustomMessageHandler, + NS::Target: NodeSigner, +{ + pub(crate) fn new( + entropy_source: ES, promise_secret: [u8; 32], + pending_messages: Arc>>, + pending_events: Arc, + channel_manager: Arc>, + ) -> Self { + Self { + entropy_source, + promise_secret, + pending_messages, + pending_events, + per_peer_state: RwLock::new(HashMap::new()), + channels_by_scid: RwLock::new(HashMap::new()), + peer_manager: Mutex::new(None), + channel_manager, + } + } + + pub fn set_peer_manager( + &self, peer_manager: Arc>, + ) { + *self.peer_manager.lock().unwrap() = Some(peer_manager); + } + + pub fn create_invoice( + &self, counterparty_node_id: PublicKey, payment_size_msat: Option, + token: Option, user_channel_id: u128, + ) { + let channel_id = self.generate_channel_id(); + let channel = JITChannel::new( + channel_id, + counterparty_node_id, + user_channel_id, + payment_size_msat, + token, + ); + + let mut per_peer_state = self.per_peer_state.write().unwrap(); + let peer_state_mutex = + per_peer_state.entry(counterparty_node_id).or_insert(Mutex::new(PeerState::default())); + let peer_state = peer_state_mutex.get_mut().unwrap(); + peer_state.insert_channel(channel_id, channel); + + let request_id = self.generate_request_id(); + peer_state.insert_request(request_id.clone(), channel_id); + + { + let mut pending_messages = self.pending_messages.lock().unwrap(); + pending_messages.push(( + counterparty_node_id, + Message::Request(request_id, Request::GetVersions(GetVersionsRequest {})).into(), + )); + } + + if let Some(peer_manager) = self.peer_manager.lock().unwrap().as_ref() { + peer_manager.process_events(); + } + } + + pub fn opening_fee_params_generated( + &self, counterparty_node_id: PublicKey, request_id: RequestId, + opening_fee_params_menu: Vec, min_payment_size_msat: u64, + max_payment_size_msat: u64, + ) -> Result<(), APIError> { + let per_peer_state = self.per_peer_state.read().unwrap(); + + match per_peer_state.get(&counterparty_node_id) { + Some(peer_state_mutex) => { + let mut peer_state = peer_state_mutex.lock().unwrap(); + + match peer_state.pending_requests.remove(&request_id) { + Some(Request::GetInfo(_)) => { + let response = Response::GetInfo(GetInfoResponse { + opening_fee_params_menu: opening_fee_params_menu + .into_iter() + .map(|param| param.into_opening_fee_params(&self.promise_secret)) + .collect(), + min_payment_size_msat, + max_payment_size_msat, + }); + self.enqueue_response(counterparty_node_id, request_id, response); + Ok(()) + } + _ => Err(APIError::APIMisuseError { + err: format!( + "No pending get_info request for request_id: {:?}", + request_id + ), + }), + } + } + None => Err(APIError::APIMisuseError { + err: format!("No state for the counterparty exists: {:?}", counterparty_node_id), + }), + } + } + + pub fn opening_fee_params_selected( + &self, counterparty_node_id: PublicKey, channel_id: u128, + opening_fee_params: OpeningFeeParams, + ) -> Result<(), APIError> { + let per_peer_state = self.per_peer_state.read().unwrap(); + match per_peer_state.get(&counterparty_node_id) { + Some(peer_state_mutex) => { + let mut peer_state = peer_state_mutex.lock().unwrap(); + + if let Some(channel) = peer_state.channels_by_id.get_mut(&channel_id) { + if channel.state == JITChannelState::PendingMenuSelection { + channel.state = JITChannelState::BuyRequested; + channel.fees = Some(opening_fee_params.clone()); + + let request_id = self.generate_request_id(); + let payment_size_msat = channel.payment_size_msat; + peer_state.insert_request(request_id.clone(), channel_id); + + { + let mut pending_messages = self.pending_messages.lock().unwrap(); + pending_messages.push(( + counterparty_node_id, + Message::Request( + request_id, + Request::Buy(BuyRequest { + version: SUPPORTED_SPEC_VERSION, + opening_fee_params, + payment_size_msat, + }), + ) + .into(), + )); + } + if let Some(peer_manager) = self.peer_manager.lock().unwrap().as_ref() { + peer_manager.process_events(); + } + } else { + return Err(APIError::APIMisuseError { + err: "Channel is not pending menu selection".to_string(), + }); + } + } else { + return Err(APIError::APIMisuseError { + err: format!("Channel with id {} not found", channel_id), + }); + } + } + None => { + return Err(APIError::APIMisuseError { + err: format!("No existing state with counterparty {}", counterparty_node_id), + }) + } + } + + Ok(()) + } + + pub fn invoice_parameters_generated( + &self, counterparty_node_id: PublicKey, request_id: RequestId, scid: u64, + cltv_expiry_delta: u32, client_trusts_lsp: bool, + ) -> Result<(), APIError> { + let per_peer_state = self.per_peer_state.read().unwrap(); + + match per_peer_state.get(&counterparty_node_id) { + Some(peer_state_mutex) => { + let mut peer_state = peer_state_mutex.lock().unwrap(); + + match peer_state.pending_requests.remove(&request_id) { + Some(Request::Buy(buy_request)) => { + let mut channels_by_scid = self.channels_by_scid.write().unwrap(); + channels_by_scid.insert( + scid, + JITChannel { + id: 0, + user_id: 0, + state: JITChannelState::BuyRequested, + fees: Some(buy_request.opening_fee_params), + token: None, + min_payment_size_msat: None, + max_payment_size_msat: None, + payment_size_msat: buy_request.payment_size_msat, + counterparty_node_id, + scid: Some(scid), + lsp_cltv_expiry_delta: Some(cltv_expiry_delta), + amt_to_forward_msat: None, + intercept_id: None, + }, + ); + + let block = scid_utils::block_from_scid(&scid); + let tx_index = scid_utils::tx_index_from_scid(&scid); + let vout = scid_utils::vout_from_scid(&scid); + + let jit_channel_scid = format!("{}x{}x{}", block, tx_index, vout); + + self.enqueue_response( + counterparty_node_id, + request_id, + Response::Buy(BuyResponse { + jit_channel_scid, + lsp_cltv_expiry_delta: cltv_expiry_delta, + client_trusts_lsp, + }), + ); + + Ok(()) + } + _ => Err(APIError::APIMisuseError { + err: format!("No pending buy request for request_id: {:?}", request_id), + }), + } + } + None => Err(APIError::APIMisuseError { + err: format!("No state for the counterparty exists: {:?}", counterparty_node_id), + }), + } + } + + // need to decide if we should ignore, enqueue OpenChannel event, or enqueue FailInterceptedHTLC event + pub(crate) fn htlc_intercepted( + &self, scid: u64, intercept_id: InterceptId, inbound_amount_msat: u64, + expected_outbound_amount_msat: u64, + ) -> Result<(), APIError> { + let mut channels_by_scid = self.channels_by_scid.write().unwrap(); + + if let Some(channel) = channels_by_scid.get_mut(&scid) { + if let Some(fees) = &channel.fees { + let opening_fee_msat = utils::compute_opening_fee( + expected_outbound_amount_msat, + fees.min_fee_msat, + fees.proportional as u64, + ); + + if let Some(opening_fee_msat) = opening_fee_msat { + let amt_to_forward_msat = expected_outbound_amount_msat - opening_fee_msat; + channel.amt_to_forward_msat = Some(amt_to_forward_msat); + channel.intercept_id = Some(intercept_id); + + self.enqueue_event(Event::LSPS2(crate::JITChannelEvent::OpenChannel { + their_network_key: channel.counterparty_node_id, + inbound_amount_msat, + expected_outbound_amount_msat, + amt_to_forward_msat, + opening_fee_msat, + user_channel_id: scid as u128, + })); + } else { + self.channel_manager.fail_intercepted_htlc(intercept_id)?; + } + } else { + self.channel_manager.fail_intercepted_htlc(intercept_id)?; + } + } + + Ok(()) + } + + // figure out which intercept id is waiting on this channel and enqueue ForwardInterceptedHTLC event + pub(crate) fn channel_ready( + &self, user_channel_id: u128, channel_id: &[u8; 32], counterparty_node_id: &PublicKey, + ) -> Result<(), APIError> { + let channels_by_scid = self.channels_by_scid.read().unwrap(); + + if let Ok(scid) = user_channel_id.try_into() { + if let Some(channel) = channels_by_scid.get(&scid) { + self.channel_manager.forward_intercepted_htlc( + channel.intercept_id.unwrap(), + channel_id, + *counterparty_node_id, + channel.amt_to_forward_msat.unwrap(), + )?; + } else { + return Err(APIError::APIMisuseError { + err: format!( + "Could not find a channel with user_channel_id {}", + user_channel_id + ), + }); + } + } else { + return Err(APIError::APIMisuseError { + err: format!("Could not parse user_channel_id into u64 scid {}", user_channel_id), + }); + } + + Ok(()) + } + + fn generate_channel_id(&self) -> u128 { + let bytes = self.entropy_source.get_secure_random_bytes(); + let mut id_bytes: [u8; 16] = [0; 16]; + id_bytes.copy_from_slice(&bytes[0..16]); + u128::from_be_bytes(id_bytes) + } + + fn generate_request_id(&self) -> RequestId { + let bytes = self.entropy_source.get_secure_random_bytes(); + RequestId(utils::hex_str(&bytes[0..16])) + } + + fn enqueue_response( + &self, counterparty_node_id: PublicKey, request_id: RequestId, response: Response, + ) { + { + let mut pending_messages = self.pending_messages.lock().unwrap(); + pending_messages + .push((counterparty_node_id, Message::Response(request_id, response).into())); + } + + if let Some(peer_manager) = self.peer_manager.lock().unwrap().as_ref() { + peer_manager.process_events(); + } + } + + fn enqueue_event(&self, event: Event) { + self.pending_events.enqueue(event); + } + + fn handle_get_versions_request( + &self, request_id: RequestId, counterparty_node_id: &PublicKey, + ) -> Result<(), LightningError> { + // not sure best way to extract a vec to a constant? lazy_static? + self.enqueue_response( + *counterparty_node_id, + request_id, + Response::GetVersions(GetVersionsResponse { versions: vec![1] }), + ); + Ok(()) + } + + fn handle_get_versions_response( + &self, request_id: RequestId, counterparty_node_id: &PublicKey, result: GetVersionsResponse, + ) -> Result<(), LightningError> { + let per_peer_state = self.per_peer_state.read().unwrap(); + match per_peer_state.get(counterparty_node_id) { + Some(peer_state_mutex) => { + let mut peer_state = peer_state_mutex.lock().unwrap(); + + match peer_state.get_channel_in_state_for_request( + &request_id, + JITChannelState::VersionsRequested, + ) { + Some(channel) => { + let channel_id = channel.id; + let token = channel.token.clone(); + + if result.versions.contains(&SUPPORTED_SPEC_VERSION) { + channel.state = JITChannelState::MenuRequested; + + let request_id = self.generate_request_id(); + peer_state.insert_request(request_id.clone(), channel_id); + + { + let mut pending_messages = self.pending_messages.lock().unwrap(); + pending_messages.push(( + *counterparty_node_id, + Message::Request( + request_id, + Request::GetInfo(GetInfoRequest { + version: SUPPORTED_SPEC_VERSION, + token, + }), + ) + .into(), + )); + } + + if let Some(peer_manager) = self.peer_manager.lock().unwrap().as_ref() { + peer_manager.process_events(); + } + } else { + peer_state.remove_channel(channel_id); + } + } + None => { + return Err(LightningError { + err: format!( + "Received get_versions response without a matching channel: {:?}", + request_id + ), + action: ErrorAction::IgnoreAndLog(Level::Info), + }) + } + } + } + None => { + return Err(LightningError { + err: format!( + "Received get_versions response from unknown peer: {:?}", + counterparty_node_id + ), + action: ErrorAction::IgnoreAndLog(Level::Info), + }) + } + } + + Ok(()) + } + + fn handle_get_info_request( + &self, request_id: RequestId, counterparty_node_id: &PublicKey, params: GetInfoRequest, + ) -> Result<(), LightningError> { + let mut per_peer_state = self.per_peer_state.write().unwrap(); + let peer_state_mutex: &mut Mutex = + per_peer_state.entry(*counterparty_node_id).or_insert(Mutex::new(PeerState::default())); + let peer_state = peer_state_mutex.get_mut().unwrap(); + peer_state.pending_requests.insert(request_id.clone(), Request::GetInfo(params.clone())); + + self.enqueue_event(Event::LSPS2(super::event::Event::GetInfo { + request_id, + counterparty_node_id: *counterparty_node_id, + version: params.version, + token: params.token, + })); + Ok(()) + } + + fn handle_get_info_response( + &self, request_id: RequestId, counterparty_node_id: &PublicKey, result: GetInfoResponse, + ) -> Result<(), LightningError> { + let per_peer_state = self.per_peer_state.read().unwrap(); + match per_peer_state.get(counterparty_node_id) { + Some(peer_state_mutex) => { + let mut peer_state = peer_state_mutex.lock().unwrap(); + + match peer_state + .get_channel_in_state_for_request(&request_id, JITChannelState::MenuRequested) + { + Some(channel) => { + channel.state = JITChannelState::PendingMenuSelection; + channel.min_payment_size_msat = Some(result.min_payment_size_msat); + channel.max_payment_size_msat = Some(result.max_payment_size_msat); + + self.enqueue_event(Event::LSPS2(super::event::Event::GetInfoResponse { + counterparty_node_id: *counterparty_node_id, + opening_fee_params_menu: result.opening_fee_params_menu, + min_payment_size_msat: result.min_payment_size_msat, + max_payment_size_msat: result.max_payment_size_msat, + channel_id: channel.id, + user_channel_id: channel.user_id, + })); + } + None => { + return Err(LightningError { + err: format!( + "Received get_info response without a matching channel: {:?}", + request_id + ), + action: ErrorAction::IgnoreAndLog(Level::Info), + }) + } + } + } + None => { + return Err(LightningError { + err: format!( + "Received get_info response from unknown peer: {:?}", + counterparty_node_id + ), + action: ErrorAction::IgnoreAndLog(Level::Info), + }) + } + } + + Ok(()) + } + + fn handle_get_info_error( + &self, request_id: RequestId, counterparty_node_id: &PublicKey, error: ResponseError, + ) -> Result<(), LightningError> { + let per_peer_state = self.per_peer_state.read().unwrap(); + match per_peer_state.get(counterparty_node_id) { + Some(peer_state_mutex) => { + let mut peer_state = peer_state_mutex.lock().unwrap(); + + match peer_state + .get_channel_in_state_for_request(&request_id, JITChannelState::BuyRequested) + { + Some(channel) => { + let channel_id = channel.id; + peer_state.remove_channel(channel_id); + return Err(LightningError { + err: format!("Received error response from getinfo request ({:?}) with counterparty {:?}. Removing channel {}. code = {}, message = {}", request_id, counterparty_node_id, channel_id, error.code, error.message), + action: ErrorAction::IgnoreAndLog(Level::Info) + }); + } + None => { + return Err(LightningError { + err: format!("Received an unexpected error response for a getinfo request from counterparty ({:?})", counterparty_node_id), + action: ErrorAction::IgnoreAndLog(Level::Info) + }); + } + } + } + None => { + return Err(LightningError { + err: format!("Received error response for a getinfo request from an unknown counterparty ({:?})", counterparty_node_id), + action: ErrorAction::IgnoreAndLog(Level::Info) + }); + } + } + } + + fn handle_buy_request( + &self, request_id: RequestId, counterparty_node_id: &PublicKey, params: BuyRequest, + ) -> Result<(), LightningError> { + if params.opening_fee_params.is_valid(&self.promise_secret) { + let mut per_peer_state = self.per_peer_state.write().unwrap(); + let peer_state_mutex = per_peer_state + .entry(*counterparty_node_id) + .or_insert(Mutex::new(PeerState::default())); + let peer_state = peer_state_mutex.get_mut().unwrap(); + peer_state.pending_requests.insert(request_id.clone(), Request::Buy(params.clone())); + + self.enqueue_event(Event::LSPS2(super::event::Event::BuyRequest { + request_id, + version: params.version, + counterparty_node_id: *counterparty_node_id, + opening_fee_params: params.opening_fee_params, + payment_size_msat: params.payment_size_msat, + })); + } + Ok(()) + } + + fn handle_buy_response( + &self, request_id: RequestId, counterparty_node_id: &PublicKey, result: BuyResponse, + ) -> Result<(), LightningError> { + let per_peer_state = self.per_peer_state.read().unwrap(); + match per_peer_state.get(counterparty_node_id) { + Some(peer_state_mutex) => { + let mut peer_state = peer_state_mutex.lock().unwrap(); + + match peer_state + .get_channel_in_state_for_request(&request_id, JITChannelState::BuyRequested) + { + Some(channel) => { + channel.state = JITChannelState::PendingPayment; + channel.lsp_cltv_expiry_delta = Some(result.lsp_cltv_expiry_delta); + + if let Ok(scid) = + scid_utils::scid_from_human_readable_string(&result.jit_channel_scid) + { + channel.scid = Some(scid); + + self.enqueue_event(Event::LSPS2( + super::event::Event::InvoiceGenerationReady { + counterparty_node_id: *counterparty_node_id, + scid, + cltv_expiry_delta: result.lsp_cltv_expiry_delta, + min_payment_size_msat: channel.min_payment_size_msat, + max_payment_size_msat: channel.max_payment_size_msat, + payment_size_msat: channel.payment_size_msat, + fees: channel.fees.clone().unwrap(), + client_trusts_lsp: result.client_trusts_lsp, + user_channel_id: channel.user_id, + }, + )); + } else { + return Err(LightningError { + err: format!( + "Received buy response with an invalid scid {}", + result.jit_channel_scid + ), + action: ErrorAction::IgnoreAndLog(Level::Info), + }); + } + } + None => { + return Err(LightningError { + err: format!( + "Received buy response without a matching channel: {:?}", + request_id + ), + action: ErrorAction::IgnoreAndLog(Level::Info), + }); + } + } + } + None => { + return Err(LightningError { + err: format!( + "Received buy response from unknown peer: {:?}", + counterparty_node_id + ), + action: ErrorAction::IgnoreAndLog(Level::Info), + }); + } + } + Ok(()) + } + + fn handle_buy_error( + &self, request_id: RequestId, counterparty_node_id: &PublicKey, error: ResponseError, + ) -> Result<(), LightningError> { + let per_peer_state = self.per_peer_state.read().unwrap(); + match per_peer_state.get(counterparty_node_id) { + Some(peer_state_mutex) => { + let mut peer_state = peer_state_mutex.lock().unwrap(); + + match peer_state + .get_channel_in_state_for_request(&request_id, JITChannelState::BuyRequested) + { + Some(channel) => { + let channel_id = channel.id; + peer_state.remove_channel(channel_id); + return Err(LightningError { err: format!( "Received error response from buy request ({:?}) with counterparty {:?}. Removing channel {}. code = {}, message = {}", request_id, counterparty_node_id, channel_id, error.code, error.message), action: ErrorAction::IgnoreAndLog(Level::Info)}); + } + None => { + return Err(LightningError { err: format!("Received an unexpected error response for a buy request from counterparty ({:?})", counterparty_node_id), action: ErrorAction::IgnoreAndLog(Level::Info)}); + } + } + } + None => { + return Err(LightningError { err: format!("Received error response for a buy request from an unknown counterparty ({:?})",counterparty_node_id), action: ErrorAction::IgnoreAndLog(Level::Info)}); + } + } + } +} + +impl< + ES: Deref, + M: Deref, + T: Deref, + F: Deref, + R: Deref, + SP: Deref, + Descriptor: SocketDescriptor, + L: Deref, + RM: Deref, + CM: Deref, + OM: Deref, + CMH: Deref, + NS: Deref, + > ProtocolMessageHandler + for JITChannelManager +where + ES::Target: EntropySource, + M::Target: chain::Watch<::Signer>, + T::Target: BroadcasterInterface, + F::Target: FeeEstimator, + R::Target: Router, + SP::Target: SignerProvider, + L::Target: Logger, + RM::Target: RoutingMessageHandler, + CM::Target: ChannelMessageHandler, + OM::Target: OnionMessageHandler, + CMH::Target: CustomMessageHandler, + NS::Target: NodeSigner, +{ + type ProtocolMessage = Message; + const PROTOCOL_NUMBER: Option = Some(2); + + fn handle_message( + &self, message: Self::ProtocolMessage, counterparty_node_id: &PublicKey, + ) -> Result<(), LightningError> { + match message { + Message::Request(request_id, request) => match request { + super::msgs::Request::GetVersions(_) => { + self.handle_get_versions_request(request_id, counterparty_node_id) + } + super::msgs::Request::GetInfo(params) => { + self.handle_get_info_request(request_id, counterparty_node_id, params) + } + super::msgs::Request::Buy(params) => { + self.handle_buy_request(request_id, counterparty_node_id, params) + } + }, + Message::Response(request_id, response) => match response { + super::msgs::Response::GetVersions(result) => { + self.handle_get_versions_response(request_id, counterparty_node_id, result) + } + super::msgs::Response::GetInfo(result) => { + self.handle_get_info_response(request_id, counterparty_node_id, result) + } + super::msgs::Response::GetInfoError(error) => { + self.handle_get_info_error(request_id, counterparty_node_id, error) + } + super::msgs::Response::Buy(result) => { + self.handle_buy_response(request_id, counterparty_node_id, result) + } + super::msgs::Response::BuyError(error) => { + self.handle_buy_error(request_id, counterparty_node_id, error) + } + }, + } + } +} diff --git a/src/jit_channel/event.rs b/src/jit_channel/event.rs new file mode 100644 index 0000000..a86b7c0 --- /dev/null +++ b/src/jit_channel/event.rs @@ -0,0 +1,110 @@ +// This file is Copyright its original authors, visible in version contror +// history. +// +// This file is licensed under the Apache License, Version 2.0 or the MIT license +// , at your option. +// You may not use this file except in accordance with one or both of these +// licenses. + +use bitcoin::secp256k1::PublicKey; + +use super::msgs::OpeningFeeParams; +use crate::transport::msgs::RequestId; + +/// An Event which you should probably take some action in response to. +#[derive(Clone, Debug, PartialEq, Eq)] +pub enum Event { + /// A request from a client for information about JIT Channel parameters. + /// + /// You must calculate the paramaeters for this client and pass them to + /// [`crate::LiquidityManager::opening_fee_params_generated`]. + GetInfo { + /// An identifier that must be passed to [`crate::LiquidityManager::opening_fee_params_generated`]. + request_id: RequestId, + /// The node id of the client making the information request. + counterparty_node_id: PublicKey, + /// The protocol version they would like to use. + version: u16, + /// An optional token that can be used as an api key, coupon code, etc. + token: Option, + }, + /// Information from LSP about their current fee and channel parameters. + /// + /// You must call [`crate::LiquidityManager::opening_fee_params_selected`] with the fee parameter + /// you want to use if you wish to proceed opening a channel. + GetInfoResponse { + /// Needs to be passed to [`crate::LiquidityManager::opening_fee_params_selected`]. + channel_id: u128, + /// The node id of the LSP that provided this response. + counterparty_node_id: PublicKey, + /// The menu of fee parameters the LSP is offering at this time. + /// You must select one of these if you wish to proceed. + opening_fee_params_menu: Vec, + /// The min payment size allowed when opening the channel. + min_payment_size_msat: u64, + /// The max payment size allowed when opening the channel. + max_payment_size_msat: u64, + /// The user_channel_id value passed in to [`crate::LiquidityManager::create_invoice`]. + user_channel_id: u128, + }, + /// A client has selected a opening fee parameter to use and would like to + /// purchase a channel with an optional initial payment size. + /// + /// If payment_size_msat is [`Option::Some`] then the payer is allowed to use MPP + /// If payment_size_msat is [`Option::None`] then the payer cannot use MPP + /// + /// You must generate an scid and cltv_expiry_delta for them to use + /// and call [`crate::LiquidityManager::invoice_parameters_generated`]. + BuyRequest { + /// An identifier that must be passed into [`crate::LiquidityManager::invoice_parameters_generated`]. + request_id: RequestId, + /// The client node id that is making this request. + counterparty_node_id: PublicKey, + /// The version of the protocol they would like to use. + version: u16, + /// The channel parameters they have selected. + opening_fee_params: OpeningFeeParams, + /// The size of the initial payment they would like to receive. + payment_size_msat: Option, + }, + /// Use the provided fields to generate an invoice and give to payer. + /// + /// When the invoice is paid the LSP will open a channel to you + /// with the previously agreed upon parameters. + InvoiceGenerationReady { + /// The node id of the LSP. + counterparty_node_id: PublicKey, + /// The short channel id to use in the route hint. + scid: u64, + /// The cltv_expiry_delta to use in the route hint. + cltv_expiry_delta: u32, + /// The agreed upon channel parameters. + fees: OpeningFeeParams, + /// The min payment size allowed. + min_payment_size_msat: Option, + /// The max payment size allowed. + max_payment_size_msat: Option, + /// The initial payment size you specified. + payment_size_msat: Option, + /// The trust model the lsp expects. + client_trusts_lsp: bool, + /// The user_channel_id value passed in to [`crate::LiquidityManager::create_invoice`]. + user_channel_id: u128, + }, + /// You should open a channel using [`lightning::ln::channelmanager::ChannelManager::create_channel`]. + OpenChannel { + /// The node to open channel with + their_network_key: PublicKey, + /// The intercepted htlc amount in msats + inbound_amount_msat: u64, + /// The amount the client expects to receive before fees are taken out + expected_outbound_amount_msat: u64, + /// The amount to forward after fees + amt_to_forward_msat: u64, + /// The fee earned for opening the channel + opening_fee_msat: u64, + /// An internal id used to track channel open + user_channel_id: u128, + }, +} diff --git a/src/jit_channel/mod.rs b/src/jit_channel/mod.rs index 0d948d9..6f871ea 100644 --- a/src/jit_channel/mod.rs +++ b/src/jit_channel/mod.rs @@ -7,4 +7,8 @@ // You may not use this file except in accordance with one or both of these // licenses. -//! Types and primitives that implement the LSPS3: JIT Channel Negotiation specification. +//! Types and primitives that implement the LSPS2: JIT Channel Negotiation specification. +pub mod channel_manager; +pub mod event; +pub mod msgs; +pub mod scid_utils; diff --git a/src/jit_channel/msgs.rs b/src/jit_channel/msgs.rs new file mode 100644 index 0000000..11bad48 --- /dev/null +++ b/src/jit_channel/msgs.rs @@ -0,0 +1,252 @@ +use std::convert::TryFrom; + +use bitcoin::hashes::hmac::{Hmac, HmacEngine}; +use bitcoin::hashes::sha256::Hash as Sha256; +use bitcoin::hashes::{Hash, HashEngine}; +use serde::{Deserialize, Serialize}; + +use crate::transport::msgs::{LSPSMessage, RequestId, ResponseError}; +use crate::utils; + +pub(crate) const LSPS2_GETVERSIONS_METHOD_NAME: &str = "lsps2.getversions"; +pub(crate) const LSPS2_GETINFO_METHOD_NAME: &str = "lsps2.getinfo"; +pub(crate) const LSPS2_BUY_METHOD_NAME: &str = "lsps2.buy"; + +#[derive(Clone, Debug, PartialEq, Eq, Deserialize, Serialize, Default)] +#[serde(default)] +pub struct GetVersionsRequest {} + +#[derive(Clone, Debug, PartialEq, Eq, Deserialize, Serialize)] +pub struct GetVersionsResponse { + pub versions: Vec, +} + +#[derive(Clone, Debug, PartialEq, Eq, Deserialize, Serialize)] +pub struct GetInfoRequest { + pub version: u16, + pub token: Option, +} + +/// Fees and parameters for a JIT Channel. +/// +/// The client will pay max(min_fee_msat, proportional*(payment_size_msat/1_000_000)). +pub struct RawOpeningFeeParams { + /// The minimum fee required for the channel open. + pub min_fee_msat: u64, + /// A fee proportional to the size of the initial payment. + pub proportional: u32, + /// An ISO8601 formatted date for which these params are valid. + pub valid_until: String, + /// number of blocks that the LSP promises it will keep the channel alive without closing, after confirmation. + pub min_lifetime: u32, + /// Maximum number of blocks that the client is allowed to set its to_self_delay parameter. + pub max_client_to_self_delay: u32, +} + +impl RawOpeningFeeParams { + pub(crate) fn into_opening_fee_params(self, promise_secret: &[u8; 32]) -> OpeningFeeParams { + let mut hmac = HmacEngine::::new(promise_secret); + hmac.input(&self.min_fee_msat.to_be_bytes()); + hmac.input(&self.proportional.to_be_bytes()); + hmac.input(self.valid_until.as_bytes()); + hmac.input(&self.min_lifetime.to_be_bytes()); + hmac.input(&self.max_client_to_self_delay.to_be_bytes()); + let promise_bytes = Hmac::from_engine(hmac).into_inner(); + let promise = utils::hex_str(&promise_bytes[..]); + OpeningFeeParams { + min_fee_msat: self.min_fee_msat, + proportional: self.proportional, + valid_until: self.valid_until.clone(), + min_lifetime: self.min_lifetime, + max_client_to_self_delay: self.max_client_to_self_delay, + promise, + } + } +} + +#[derive(Clone, Debug, PartialEq, Eq, Deserialize, Serialize)] +/// Fees and parameters for a JIT Channel with promise. +/// +/// The client will pay max(min_fee_msat, proportional*(payment_size_msat/1_000_000)). +pub struct OpeningFeeParams { + /// The minimum fee required for the channel open. + pub min_fee_msat: u64, + /// A fee proportional to the size of the initial payment. + pub proportional: u32, + /// An ISO8601 formatted date for which these params are valid. + pub valid_until: String, + /// number of blocks that the LSP promises it will keep the channel alive without closing, after confirmation. + pub min_lifetime: u32, + /// Maximum number of blocks that the client is allowed to set its to_self_delay parameter. + pub max_client_to_self_delay: u32, + /// Field used by the LSP to validate that these parameters were actually given out by them. + pub promise: String, +} + +impl OpeningFeeParams { + /// Determine that these parameters are valid given the secret used to generate the promise. + pub fn is_valid(&self, promise_secret: &[u8; 32]) -> bool { + let mut hmac = HmacEngine::::new(promise_secret); + hmac.input(&self.min_fee_msat.to_be_bytes()); + hmac.input(&self.proportional.to_be_bytes()); + hmac.input(self.valid_until.as_bytes()); + hmac.input(&self.min_lifetime.to_be_bytes()); + hmac.input(&self.max_client_to_self_delay.to_be_bytes()); + let promise_bytes = Hmac::from_engine(hmac).into_inner(); + let promise = utils::hex_str(&promise_bytes[..]); + promise == self.promise + } +} + +#[derive(Clone, Debug, PartialEq, Eq, Deserialize, Serialize)] +pub struct GetInfoResponse { + pub opening_fee_params_menu: Vec, + pub min_payment_size_msat: u64, + pub max_payment_size_msat: u64, +} + +#[derive(Clone, Debug, PartialEq, Eq, Deserialize, Serialize)] +pub struct BuyRequest { + pub version: u16, + pub opening_fee_params: OpeningFeeParams, + #[serde(skip_serializing_if = "Option::is_none")] + pub payment_size_msat: Option, +} + +#[derive(Clone, Debug, PartialEq, Eq, Deserialize, Serialize)] +pub struct BuyResponse { + pub jit_channel_scid: String, + pub lsp_cltv_expiry_delta: u32, + #[serde(default)] + pub client_trusts_lsp: bool, +} + +#[derive(Clone, Debug, PartialEq, Eq)] +pub enum Request { + GetVersions(GetVersionsRequest), + GetInfo(GetInfoRequest), + Buy(BuyRequest), +} + +impl Request { + pub fn method(&self) -> &str { + match self { + Request::GetVersions(_) => LSPS2_GETVERSIONS_METHOD_NAME, + Request::GetInfo(_) => LSPS2_GETINFO_METHOD_NAME, + Request::Buy(_) => LSPS2_BUY_METHOD_NAME, + } + } +} + +#[derive(Clone, Debug, PartialEq, Eq)] +pub enum Response { + GetVersions(GetVersionsResponse), + GetInfo(GetInfoResponse), + GetInfoError(ResponseError), + Buy(BuyResponse), + BuyError(ResponseError), +} + +#[derive(Clone, Debug, PartialEq, Eq)] +pub enum Message { + Request(RequestId, Request), + Response(RequestId, Response), +} + +impl TryFrom for Message { + type Error = (); + + fn try_from(message: LSPSMessage) -> Result { + if let LSPSMessage::LSPS2(message) = message { + return Ok(message); + } + + Err(()) + } +} + +impl From for LSPSMessage { + fn from(message: Message) -> Self { + LSPSMessage::LSPS2(message) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn into_opening_fee_params_produces_valid_promise() { + let min_fee_msat = 100; + let proportional = 21; + let valid_until = "2023-05-20".to_string(); + let min_lifetime = 144; + let max_client_to_self_delay = 128; + + let raw = RawOpeningFeeParams { + min_fee_msat, + proportional, + valid_until: valid_until.clone(), + min_lifetime, + max_client_to_self_delay, + }; + + let promise_secret = [1u8; 32]; + + let opening_fee_params = raw.into_opening_fee_params(&promise_secret); + + assert_eq!(opening_fee_params.min_fee_msat, min_fee_msat); + assert_eq!(opening_fee_params.proportional, proportional); + assert_eq!(opening_fee_params.valid_until, valid_until); + assert_eq!(opening_fee_params.min_lifetime, min_lifetime); + assert_eq!(opening_fee_params.max_client_to_self_delay, max_client_to_self_delay); + + assert!(opening_fee_params.is_valid(&promise_secret)); + } + + #[test] + fn changing_single_field_produced_invalid_params() { + let min_fee_msat = 100; + let proportional = 21; + let valid_until = "2023-05-20".to_string(); + let min_lifetime = 144; + let max_client_to_self_delay = 128; + + let raw = RawOpeningFeeParams { + min_fee_msat, + proportional, + valid_until, + min_lifetime, + max_client_to_self_delay, + }; + + let promise_secret = [1u8; 32]; + + let mut opening_fee_params = raw.into_opening_fee_params(&promise_secret); + opening_fee_params.min_fee_msat = min_fee_msat + 1; + assert!(!opening_fee_params.is_valid(&promise_secret)); + } + + #[test] + fn wrong_secret_produced_invalid_params() { + let min_fee_msat = 100; + let proportional = 21; + let valid_until = "2023-05-20".to_string(); + let min_lifetime = 144; + let max_client_to_self_delay = 128; + + let raw = RawOpeningFeeParams { + min_fee_msat, + proportional, + valid_until, + min_lifetime, + max_client_to_self_delay, + }; + + let promise_secret = [1u8; 32]; + let other_secret = [2u8; 32]; + + let opening_fee_params = raw.into_opening_fee_params(&promise_secret); + assert!(!opening_fee_params.is_valid(&other_secret)); + } +} diff --git a/src/jit_channel/scid_utils.rs b/src/jit_channel/scid_utils.rs new file mode 100644 index 0000000..ce83457 --- /dev/null +++ b/src/jit_channel/scid_utils.rs @@ -0,0 +1,71 @@ +#[derive(Debug, PartialEq, Eq)] +pub enum ShortChannelIdError { + InvalidScid, +} + +/// Maximum transaction index that can be used in a `short_channel_id`. +/// This value is based on the 3-bytes available for tx index. +pub const MAX_SCID_TX_INDEX: u64 = 0x00ffffff; + +/// Maximum vout index that can be used in a `short_channel_id`. This +/// value is based on the 2-bytes available for the vout index. +pub const MAX_SCID_VOUT_INDEX: u64 = 0xffff; + +/// Extracts the block height (most significant 3-bytes) from the `short_channel_id` +pub fn block_from_scid(short_channel_id: &u64) -> u32 { + (short_channel_id >> 40) as u32 +} + +/// Extracts the tx index (bytes [2..4]) from the `short_channel_id` +pub fn tx_index_from_scid(short_channel_id: &u64) -> u32 { + ((short_channel_id >> 16) & MAX_SCID_TX_INDEX) as u32 +} + +/// Extracts the vout (bytes [0..2]) from the `short_channel_id` +pub fn vout_from_scid(short_channel_id: &u64) -> u16 { + ((short_channel_id) & MAX_SCID_VOUT_INDEX) as u16 +} + +pub fn scid_from_human_readable_string( + human_readable_scid: &str, +) -> Result { + let mut parts = human_readable_scid.split('x'); + + let block: u64 = parts + .next() + .ok_or(ShortChannelIdError::InvalidScid)? + .parse() + .map_err(|_e| ShortChannelIdError::InvalidScid)?; + let tx_index: u64 = parts + .next() + .ok_or(ShortChannelIdError::InvalidScid)? + .parse() + .map_err(|_e| ShortChannelIdError::InvalidScid)?; + let vout_index: u64 = parts + .next() + .ok_or(ShortChannelIdError::InvalidScid)? + .parse() + .map_err(|_e| ShortChannelIdError::InvalidScid)?; + + Ok((block << 40) | (tx_index << 16) | vout_index) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn parses_human_readable_scid_correctly() { + let block = 140; + let tx_index = 123; + let vout = 22; + + let human_readable_scid = format!("{}x{}x{}", block, tx_index, vout); + + let scid = scid_from_human_readable_string(&human_readable_scid).unwrap(); + + assert_eq!(block_from_scid(&scid), block); + assert_eq!(tx_index_from_scid(&scid), tx_index); + assert_eq!(vout_from_scid(&scid), vout); + } +} diff --git a/src/lib.rs b/src/lib.rs index 4919017..f531a16 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -24,4 +24,8 @@ mod jit_channel; mod transport; mod utils; -pub use transport::message_handler::{LiquidityManager, LiquidityProviderConfig}; +pub use jit_channel::event::Event as JITChannelEvent; +pub use jit_channel::msgs::{OpeningFeeParams, RawOpeningFeeParams}; +pub use transport::message_handler::{ + JITChannelsConfig, LiquidityManager, LiquidityProviderConfig, +}; diff --git a/src/transport/message_handler.rs b/src/transport/message_handler.rs index 2f28124..1a235b2 100644 --- a/src/transport/message_handler.rs +++ b/src/transport/message_handler.rs @@ -1,4 +1,6 @@ use crate::events::{Event, EventQueue}; +use crate::jit_channel::channel_manager::JITChannelManager; +use crate::jit_channel::msgs::{OpeningFeeParams, RawOpeningFeeParams}; use crate::transport::msgs::{LSPSMessage, RawLSPSMessage, LSPS_MESSAGE_TYPE}; use crate::transport::protocol::LSPS0MessageHandler; @@ -6,12 +8,16 @@ use bitcoin::secp256k1::PublicKey; use lightning::chain; use lightning::chain::chaininterface::{BroadcasterInterface, FeeEstimator}; use lightning::ln::channelmanager::ChannelManager; +use lightning::ln::channelmanager::InterceptId; use lightning::ln::features::{InitFeatures, NodeFeatures}; -use lightning::ln::msgs::{ErrorAction, LightningError}; -use lightning::ln::peer_handler::CustomMessageHandler; +use lightning::ln::msgs::{ + ChannelMessageHandler, ErrorAction, LightningError, OnionMessageHandler, RoutingMessageHandler, +}; +use lightning::ln::peer_handler::{CustomMessageHandler, PeerManager, SocketDescriptor}; use lightning::ln::wire::CustomMessageReader; use lightning::routing::router::Router; use lightning::sign::{EntropySource, NodeSigner, SignerProvider}; +use lightning::util::errors::APIError; use lightning::util::logger::{Level, Logger}; use lightning::util::ser::Readable; use std::collections::HashMap; @@ -20,6 +26,8 @@ use std::io; use std::ops::Deref; use std::sync::{Arc, Mutex}; +use super::msgs::RequestId; + const LSPS_FEATURE_BIT: usize = 729; /// A trait used to implement a specific LSPS protocol. @@ -39,20 +47,47 @@ pub(crate) trait ProtocolMessageHandler { /// /// Allows end-user to configure options when using the [`LiquidityManager`] /// to provide liquidity services to clients. -pub struct LiquidityProviderConfig; +pub struct LiquidityProviderConfig { + /// Optional configuration for jit channels + /// should you want to support them + pub jit_channels: Option, +} + +/// Configuration options for jit channels +/// A configuration used for the creation of Just In Time Channels. +pub struct JITChannelsConfig { + /// Used to calculate the promise for channel parameters supplied to clients + /// + /// Note: If this changes then old promises given out will be considered invalid + pub promise_secret: [u8; 32], +} /// The main interface into LSP functionality. /// /// Should be used as a [`CustomMessageHandler`] for your /// [`lightning::ln::peer_handler::PeerManager`]'s [`lightning::ln::peer_handler::MessageHandler`]. +/// +/// Should provide a reference to your [`lightning::ln::peer_handler::PeerManager`] by calling +/// [`LiquidityManager::set_peer_manager()`] post construction. This allows the [`LiquidityManager`] to +/// wake the [`lightning::ln::peer_handler::PeerManager`] when there are pending messages to be sent. +/// +/// Users need to continually poll [`LiquidityManager::get_and_clear_pending_events()`] in order to surface +/// [`Event`]'s that likely need to be handled. +/// +/// Users must forward the [`lightning::events::Event::HTLCIntercepted`] event parameters to [`LiquidityManager::htlc_intercepted()`] +/// and the [`lightning::events::Event::ChannelReady`] event parameters to [`LiquidityManager::channel_ready()`]. pub struct LiquidityManager< - ES: Deref, + ES: Deref + Clone, M: Deref, T: Deref, F: Deref, R: Deref, SP: Deref, + Descriptor: SocketDescriptor, L: Deref, + RM: Deref, + CM: Deref, + OM: Deref, NS: Deref, > where ES::Target: EntropySource, @@ -62,18 +97,35 @@ pub struct LiquidityManager< R::Target: Router, SP::Target: SignerProvider, L::Target: Logger, + RM::Target: RoutingMessageHandler, + CM::Target: ChannelMessageHandler, + OM::Target: OnionMessageHandler, NS::Target: NodeSigner, { pending_messages: Arc>>, pending_events: Arc, request_id_to_method_map: Mutex>, lsps0_message_handler: LSPS0MessageHandler, + lsps2_message_handler: + Option, NS>>, provider_config: Option, channel_manager: Arc>, } -impl - LiquidityManager +impl< + ES: Deref + Clone, + M: Deref, + T: Deref, + F: Deref, + R: Deref, + SP: Deref, + Descriptor: SocketDescriptor, + L: Deref, + RM: Deref, + CM: Deref, + OM: Deref, + NS: Deref, + > LiquidityManager where ES::Target: EntropySource, M::Target: chain::Watch<::Signer>, @@ -82,9 +134,12 @@ where R::Target: Router, SP::Target: SignerProvider, L::Target: Logger, + RM::Target: RoutingMessageHandler, + CM::Target: ChannelMessageHandler, + OM::Target: OnionMessageHandler, NS::Target: NodeSigner, { - /// Constructor for the LiquidityManager + /// Constructor for the [`LiquidityManager`] /// /// Sets up the required protocol message handlers based on the given [`LiquidityProviderConfig`]. pub fn new( @@ -93,17 +148,31 @@ where ) -> Self where { let pending_messages = Arc::new(Mutex::new(vec![])); + let pending_events = Arc::new(EventQueue::default()); let lsps0_message_handler = - LSPS0MessageHandler::new(entropy_source, vec![], Arc::clone(&pending_messages)); + LSPS0MessageHandler::new(entropy_source.clone(), vec![], Arc::clone(&pending_messages)); + + let lsps2_message_handler = provider_config.as_ref().and_then(|config| { + config.jit_channels.as_ref().map(|jit_channels_config| { + JITChannelManager::new( + entropy_source.clone(), + jit_channels_config.promise_secret, + Arc::clone(&pending_messages), + Arc::clone(&pending_events), + Arc::clone(&channel_manager), + ) + }) + }); Self { pending_messages, - pending_events: Arc::new(EventQueue::default()), + pending_events, request_id_to_method_map: Mutex::new(HashMap::new()), lsps0_message_handler, provider_config, channel_manager, + lsps2_message_handler, } } @@ -121,6 +190,165 @@ where { self.pending_events.get_and_clear_pending_events() } + /// Set a [`lightning::ln::peer_handler::PeerManager`] reference for the message handlers + /// + /// This allows the message handlers to wake the [`lightning::ln::peer_handler::PeerManager`] by calling + /// [`lightning::ln::peer_handler::PeerManager::process_events()`] after enqueing messages to be sent. + /// + /// Without this the messages will be sent based on whatever polling interval + /// your background processor uses. + pub fn set_peer_manager( + &self, peer_manager: Arc, NS>>, + ) { + if let Some(lsps2_message_handler) = &self.lsps2_message_handler { + lsps2_message_handler.set_peer_manager(peer_manager); + } + } + + /// Initiate the creation of an invoice that when paid will open a channel + /// with enough inbound liquidity to be able to receive the payment. + /// + /// `counterparty_node_id` is the node_id of the LSP you would like to use. + /// + /// if `payment_size_msat` is [`Option::Some`] then the invoice will be for a fixed amount + /// and MPP can be used to pay it. + /// + /// if `payment_size_msat` is [`Option::None`] then the invoice can be for an arbitrary amount + /// but MPP can no longer be used to pay it. + /// + /// `token` is an optional String that will be provided to the LSP. + /// it can be used by the LSP as an API key, coupon code, or some other way to identify a user. + pub fn create_invoice( + &self, counterparty_node_id: PublicKey, payment_size_msat: Option, + token: Option, user_channel_id: u128, + ) -> Result<(), APIError> { + if let Some(lsps2_message_handler) = &self.lsps2_message_handler { + lsps2_message_handler.create_invoice( + counterparty_node_id, + payment_size_msat, + token, + user_channel_id, + ); + Ok(()) + } else { + Err(APIError::APIMisuseError { + err: "JIT Channels were not configured when LSPManager was instantiated" + .to_string(), + }) + } + } + + /// Used by LSP to provide fee parameters to a client requesting a JIT Channel. + /// + /// Should be called in response to receiving a [`crate::JITChannelEvent::GetInfo`] event. + pub fn opening_fee_params_generated( + &self, counterparty_node_id: PublicKey, request_id: RequestId, + opening_fee_params_menu: Vec, min_payment_size_msat: u64, + max_payment_size_msat: u64, + ) -> Result<(), APIError> { + if let Some(lsps2_message_handler) = &self.lsps2_message_handler { + lsps2_message_handler.opening_fee_params_generated( + counterparty_node_id, + request_id, + opening_fee_params_menu, + min_payment_size_msat, + max_payment_size_msat, + ) + } else { + Err(APIError::APIMisuseError { + err: "JIT Channels were not configured when LSPManager was instantiated" + .to_string(), + }) + } + } + + /// Used by client to confirm which channel parameters to use for the JIT Channel buy request. + /// + /// Should be called in response to receiving a [`crate::JITChannelEvent::GetInfoResponse`] event. + pub fn opening_fee_params_selected( + &self, counterparty_node_id: PublicKey, channel_id: u128, + opening_fee_params: OpeningFeeParams, + ) -> Result<(), APIError> { + if let Some(lsps2_message_handler) = &self.lsps2_message_handler { + lsps2_message_handler.opening_fee_params_selected( + counterparty_node_id, + channel_id, + opening_fee_params, + ) + } else { + Err(APIError::APIMisuseError { + err: "JIT Channels were not configured when LSPManager was instantiated" + .to_string(), + }) + } + } + + /// Used by LSP to provide client with the scid and cltv_expiry_delta to use in their invoice + /// + /// Should be called in response to receiving a [`crate::JITChannelEvent::BuyRequest`] event. + pub fn invoice_parameters_generated( + &self, counterparty_node_id: PublicKey, request_id: RequestId, scid: u64, + cltv_expiry_delta: u32, client_trusts_lsp: bool, + ) -> Result<(), APIError> { + if let Some(lsps2_message_handler) = &self.lsps2_message_handler { + lsps2_message_handler.invoice_parameters_generated( + counterparty_node_id, + request_id, + scid, + cltv_expiry_delta, + client_trusts_lsp, + ) + } else { + Err(APIError::APIMisuseError { + err: "JIT Channels were not configured when LSPManager was instantiated" + .to_string(), + }) + } + } + + /// Forward [`lightning::events::Event::HTLCIntercepted`] event parameters into this function. + /// + /// Will fail the intercepted HTLC if the scid matches a payment we are expecting + /// but the payment amount is incorrect or the expiry has passed. + /// + /// Will generate a [`crate::JITChannelEvent::OpenChannel`] event if the scid matches a payment we are expected + /// and the payment amount is correct and the offer has not expired. + /// + /// Will do nothing if the scid does not match any of the ones we gave out. + pub fn htlc_intercepted( + &self, scid: u64, intercept_id: InterceptId, inbound_amount_msat: u64, + expected_outbound_amount_msat: u64, + ) -> Result<(), APIError> { + if let Some(lsps2_message_handler) = &self.lsps2_message_handler { + lsps2_message_handler.htlc_intercepted( + scid, + intercept_id, + inbound_amount_msat, + expected_outbound_amount_msat, + )?; + } + + Ok(()) + } + + /// Forward [`lightning::events::Event::ChannelReady`] event parameters into this function. + /// + /// Will forward the intercepted HTLC if it matches a channel + /// we need to forward a payment over otherwise it will be ignored. + pub fn channel_ready( + &self, user_channel_id: u128, channel_id: &[u8; 32], counterparty_node_id: &PublicKey, + ) -> Result<(), APIError> { + if let Some(lsps2_message_handler) = &self.lsps2_message_handler { + lsps2_message_handler.channel_ready( + user_channel_id, + channel_id, + counterparty_node_id, + )?; + } + + Ok(()) + } + fn handle_lsps_message( &self, msg: LSPSMessage, sender_node_id: &PublicKey, ) -> Result<(), lightning::ln::msgs::LightningError> { @@ -131,6 +359,14 @@ where { LSPSMessage::LSPS0(msg) => { self.lsps0_message_handler.handle_message(msg, sender_node_id)?; } + LSPSMessage::LSPS2(msg) => match &self.lsps2_message_handler { + Some(lsps2_message_handler) => { + lsps2_message_handler.handle_message(msg, sender_node_id)?; + } + None => { + return Err(LightningError { err: format!("Received LSPS2 message without LSPS2 message handler configured. From node = {:?}", sender_node_id), action: ErrorAction::IgnoreAndLog(Level::Info)}); + } + }, } Ok(()) } @@ -141,16 +377,31 @@ where { } } -impl - CustomMessageReader for LiquidityManager +impl< + ES: Deref + Clone, + M: Deref, + T: Deref, + F: Deref, + R: Deref, + SP: Deref, + Descriptor: SocketDescriptor, + L: Deref, + RM: Deref, + CM: Deref, + OM: Deref, + NS: Deref, + > CustomMessageReader for LiquidityManager where ES::Target: EntropySource, + L::Target: Logger, M::Target: chain::Watch<::Signer>, T::Target: BroadcasterInterface, F::Target: FeeEstimator, R::Target: Router, SP::Target: SignerProvider, - L::Target: Logger, + RM::Target: RoutingMessageHandler, + CM::Target: ChannelMessageHandler, + OM::Target: OnionMessageHandler, NS::Target: NodeSigner, { type CustomMessage = RawLSPSMessage; @@ -165,8 +416,20 @@ where } } -impl - CustomMessageHandler for LiquidityManager +impl< + ES: Deref + Clone, + M: Deref, + T: Deref, + F: Deref, + R: Deref, + SP: Deref, + Descriptor: SocketDescriptor, + L: Deref, + RM: Deref, + CM: Deref, + OM: Deref, + NS: Deref, + > CustomMessageHandler for LiquidityManager where ES::Target: EntropySource, M::Target: chain::Watch<::Signer>, @@ -175,14 +438,20 @@ where R::Target: Router, SP::Target: SignerProvider, L::Target: Logger, + RM::Target: RoutingMessageHandler, + CM::Target: ChannelMessageHandler, + OM::Target: OnionMessageHandler, NS::Target: NodeSigner, { fn handle_custom_message( &self, msg: Self::CustomMessage, sender_node_id: &PublicKey, ) -> Result<(), lightning::ln::msgs::LightningError> { - let mut request_id_to_method_map = self.request_id_to_method_map.lock().unwrap(); + let message = { + let mut request_id_to_method_map = self.request_id_to_method_map.lock().unwrap(); + LSPSMessage::from_str_with_id_map(&msg.payload, &mut request_id_to_method_map) + }; - match LSPSMessage::from_str_with_id_map(&msg.payload, &mut request_id_to_method_map) { + match message { Ok(msg) => self.handle_lsps_message(msg, sender_node_id), Err(_) => { self.enqueue_message(*sender_node_id, LSPSMessage::Invalid); diff --git a/src/transport/msgs.rs b/src/transport/msgs.rs index fcd166c..1d3ff96 100644 --- a/src/transport/msgs.rs +++ b/src/transport/msgs.rs @@ -1,9 +1,11 @@ +use crate::jit_channel; use lightning::impl_writeable_msg; use lightning::ln::wire; use serde::de; use serde::de::{MapAccess, Visitor}; use serde::ser::SerializeStruct; use serde::{Deserialize, Deserializer, Serialize}; +use serde_json::json; use std::collections::HashMap; use std::convert::TryFrom; use std::fmt; @@ -18,6 +20,7 @@ const JSONRPC_RESULT_FIELD_KEY: &str = "result"; const JSONRPC_ERROR_FIELD_KEY: &str = "error"; const JSONRPC_INVALID_MESSAGE_ERROR_CODE: i32 = -32700; const JSONRPC_INVALID_MESSAGE_ERROR_MESSAGE: &str = "parse error"; + const LSPS0_LISTPROTOCOLS_METHOD_NAME: &str = "lsps0.listprotocols"; pub const LSPS_MESSAGE_TYPE: u16 = 37913; @@ -35,7 +38,13 @@ impl wire::Type for RawLSPSMessage { } } -#[derive(Clone, Debug, PartialEq, Eq)] +#[derive(Clone, Debug, PartialEq, Eq, Hash)] +pub enum Prefix { + LSPS0, + LSPS2, +} + +#[derive(Clone, Debug, PartialEq, Eq, Hash)] pub struct RequestId(pub String); #[derive(Clone, Debug, PartialEq, Eq, Deserialize, Serialize)] @@ -86,6 +95,7 @@ impl TryFrom for LSPS0Message { match message { LSPSMessage::Invalid => Err(()), LSPSMessage::LSPS0(message) => Ok(message), + LSPSMessage::LSPS2(_) => Err(()), } } } @@ -100,6 +110,7 @@ impl From for LSPSMessage { pub enum LSPSMessage { Invalid, LSPS0(LSPS0Message), + LSPS2(jit_channel::msgs::Message), } impl LSPSMessage { @@ -116,6 +127,9 @@ impl LSPSMessage { LSPSMessage::LSPS0(LSPS0Message::Request(request_id, request)) => { Some((request_id.0.clone(), request.method().to_string())) } + LSPSMessage::LSPS2(jit_channel::msgs::Message::Request(request_id, request)) => { + Some((request_id.0.clone(), request.method().to_string())) + } _ => None, } } @@ -154,6 +168,43 @@ impl Serialize for LSPSMessage { } } } + LSPSMessage::LSPS2(jit_channel::msgs::Message::Request(request_id, request)) => { + jsonrpc_object.serialize_field(JSONRPC_ID_FIELD_KEY, &request_id.0)?; + jsonrpc_object.serialize_field(JSONRPC_METHOD_FIELD_KEY, request.method())?; + + match request { + jit_channel::msgs::Request::GetVersions(params) => { + jsonrpc_object.serialize_field(JSONRPC_PARAMS_FIELD_KEY, params)? + } + jit_channel::msgs::Request::GetInfo(params) => { + jsonrpc_object.serialize_field(JSONRPC_PARAMS_FIELD_KEY, params)? + } + jit_channel::msgs::Request::Buy(params) => { + jsonrpc_object.serialize_field(JSONRPC_PARAMS_FIELD_KEY, params)? + } + } + } + LSPSMessage::LSPS2(jit_channel::msgs::Message::Response(request_id, response)) => { + jsonrpc_object.serialize_field(JSONRPC_ID_FIELD_KEY, &request_id.0)?; + + match response { + jit_channel::msgs::Response::GetVersions(result) => { + jsonrpc_object.serialize_field(JSONRPC_RESULT_FIELD_KEY, result)? + } + jit_channel::msgs::Response::GetInfo(result) => { + jsonrpc_object.serialize_field(JSONRPC_RESULT_FIELD_KEY, result)? + } + jit_channel::msgs::Response::GetInfoError(error) => { + jsonrpc_object.serialize_field(JSONRPC_ERROR_FIELD_KEY, error)? + } + jit_channel::msgs::Response::Buy(result) => { + jsonrpc_object.serialize_field(JSONRPC_RESULT_FIELD_KEY, result)? + } + jit_channel::msgs::Response::BuyError(error) => { + jsonrpc_object.serialize_field(JSONRPC_ERROR_FIELD_KEY, error)? + } + } + } LSPSMessage::Invalid => { let error = ResponseError { code: JSONRPC_INVALID_MESSAGE_ERROR_CODE, @@ -224,6 +275,30 @@ impl<'de, 'a> Visitor<'de> for LSPSMessageVisitor<'a> { LSPS0Request::ListProtocols(ListProtocolsRequest {}), ))) } + jit_channel::msgs::LSPS2_GETVERSIONS_METHOD_NAME => { + let request = serde_json::from_value(params.unwrap_or(json!({}))) + .map_err(de::Error::custom)?; + Ok(LSPSMessage::LSPS2(jit_channel::msgs::Message::Request( + RequestId(id), + jit_channel::msgs::Request::GetVersions(request), + ))) + } + jit_channel::msgs::LSPS2_GETINFO_METHOD_NAME => { + let request = serde_json::from_value(params.unwrap_or(json!({}))) + .map_err(de::Error::custom)?; + Ok(LSPSMessage::LSPS2(jit_channel::msgs::Message::Request( + RequestId(id), + jit_channel::msgs::Request::GetInfo(request), + ))) + } + jit_channel::msgs::LSPS2_BUY_METHOD_NAME => { + let request = serde_json::from_value(params.unwrap_or(json!({}))) + .map_err(de::Error::custom)?; + Ok(LSPSMessage::LSPS2(jit_channel::msgs::Message::Request( + RequestId(id), + jit_channel::msgs::Request::Buy(request), + ))) + } _ => Err(de::Error::custom(format!( "Received request with unknown method: {}", method @@ -248,6 +323,52 @@ impl<'de, 'a> Visitor<'de> for LSPSMessageVisitor<'a> { Err(de::Error::custom("Received invalid JSON-RPC object: one of method, result, or error required")) } } + jit_channel::msgs::LSPS2_GETVERSIONS_METHOD_NAME => { + if let Some(result) = result { + let response = + serde_json::from_value(result).map_err(de::Error::custom)?; + Ok(LSPSMessage::LSPS2(jit_channel::msgs::Message::Response( + RequestId(id), + jit_channel::msgs::Response::GetVersions(response), + ))) + } else { + Err(de::Error::custom("Received invalid lsps2.getversions response.")) + } + } + jit_channel::msgs::LSPS2_GETINFO_METHOD_NAME => { + if let Some(error) = error { + Ok(LSPSMessage::LSPS2(jit_channel::msgs::Message::Response( + RequestId(id), + jit_channel::msgs::Response::GetInfoError(error), + ))) + } else if let Some(result) = result { + let response = + serde_json::from_value(result).map_err(de::Error::custom)?; + Ok(LSPSMessage::LSPS2(jit_channel::msgs::Message::Response( + RequestId(id), + jit_channel::msgs::Response::GetInfo(response), + ))) + } else { + Err(de::Error::custom("Received invalid JSON-RPC object: one of method, result, or error required")) + } + } + jit_channel::msgs::LSPS2_BUY_METHOD_NAME => { + if let Some(error) = error { + Ok(LSPSMessage::LSPS2(jit_channel::msgs::Message::Response( + RequestId(id), + jit_channel::msgs::Response::BuyError(error), + ))) + } else if let Some(result) = result { + let response = + serde_json::from_value(result).map_err(de::Error::custom)?; + Ok(LSPSMessage::LSPS2(jit_channel::msgs::Message::Response( + RequestId(id), + jit_channel::msgs::Response::Buy(response), + ))) + } else { + Err(de::Error::custom("Received invalid JSON-RPC object: one of method, result, or error required")) + } + } _ => Err(de::Error::custom(format!( "Received response for an unknown request method: {}", method diff --git a/src/utils.rs b/src/utils.rs index 067ce0b..da40131 100644 --- a/src/utils.rs +++ b/src/utils.rs @@ -67,3 +67,13 @@ pub fn parse_pubkey(pubkey_str: &str) -> Result { Ok(pubkey.unwrap()) } + +pub fn compute_opening_fee( + payment_size_msat: u64, opening_fee_min_fee_msat: u64, opening_fee_proportional: u64, +) -> Option { + let t1 = payment_size_msat.checked_mul(opening_fee_proportional)?; + let t2 = t1.checked_add(999999)?; + let t3 = t2.checked_div(1000000)?; + let t4 = std::cmp::max(t3, opening_fee_min_fee_msat); + Some(t4) +} From 45d9be893a1bcf1361ec1004b343eb0b8866fbec Mon Sep 17 00:00:00 2001 From: John Cantrell Date: Tue, 8 Aug 2023 10:29:10 -0400 Subject: [PATCH 2/4] changes to make mutiny work --- Cargo.toml | 9 ++++----- src/jit_channel/msgs.rs | 10 ++++++++++ src/lib.rs | 3 ++- src/transport/message_handler.rs | 29 ++++++++++++++++++----------- src/transport/msgs.rs | 7 +++++-- 5 files changed, 39 insertions(+), 19 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index eee987c..39d7af1 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -8,11 +8,10 @@ description = "Types and primitives to integrate a spec-compliant LSP with an LD # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -lightning = { version = "0.0.116", features = ["max_level_trace", "std"] } -lightning-invoice = "0.24.0" -lightning-net-tokio = "0.0.116" +lightning = { version = "0.0.116", default-features = false, features = ["max_level_trace", "no-std", "grind_signatures"] } +lightning-invoice = { version = "0.24.0", default-features = false, features = ["no-std"] } -bitcoin = "0.29.0" +bitcoin = { version = "0.29.2", default-features = false, features = ["serde", "secp-recovery", "rand"] } -serde = { version = "1.0", default-features = false, features = ["derive", "alloc"] } +serde = { version = "^1.0", features = ["derive"] } serde_json = "1.0" diff --git a/src/jit_channel/msgs.rs b/src/jit_channel/msgs.rs index 11bad48..10bd18f 100644 --- a/src/jit_channel/msgs.rs +++ b/src/jit_channel/msgs.rs @@ -98,10 +98,14 @@ impl OpeningFeeParams { } } +/// Information about the parameters a LSP is willing to offer clients #[derive(Clone, Debug, PartialEq, Eq, Deserialize, Serialize)] pub struct GetInfoResponse { + /// A set of opening fee parameters. pub opening_fee_params_menu: Vec, + /// The minimum payment size required to open a channel. pub min_payment_size_msat: u64, + /// The maximum payment size the lsp will tolerate. pub max_payment_size_msat: u64, } @@ -113,10 +117,16 @@ pub struct BuyRequest { pub payment_size_msat: Option, } +/// A response from a buy request made by a client +/// +/// Includes information needed to construct an invoice. #[derive(Clone, Debug, PartialEq, Eq, Deserialize, Serialize)] pub struct BuyResponse { + /// The short channel id used by LSP to identify need to open channel. pub jit_channel_scid: String, + /// The locktime expiry delta the lsp requires. pub lsp_cltv_expiry_delta: u32, + /// A flag that indicates who is trusting who. #[serde(default)] pub client_trusts_lsp: bool, } diff --git a/src/lib.rs b/src/lib.rs index f531a16..e8c4e83 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -24,8 +24,9 @@ mod jit_channel; mod transport; mod utils; +pub use transport::msgs::{LSPS_MESSAGE_TYPE_ID, RawLSPSMessage}; pub use jit_channel::event::Event as JITChannelEvent; -pub use jit_channel::msgs::{OpeningFeeParams, RawOpeningFeeParams}; +pub use jit_channel::msgs::{OpeningFeeParams, RawOpeningFeeParams, BuyResponse, GetInfoResponse}; pub use transport::message_handler::{ JITChannelsConfig, LiquidityManager, LiquidityProviderConfig, }; diff --git a/src/transport/message_handler.rs b/src/transport/message_handler.rs index 1a235b2..61cc981 100644 --- a/src/transport/message_handler.rs +++ b/src/transport/message_handler.rs @@ -1,7 +1,7 @@ use crate::events::{Event, EventQueue}; use crate::jit_channel::channel_manager::JITChannelManager; use crate::jit_channel::msgs::{OpeningFeeParams, RawOpeningFeeParams}; -use crate::transport::msgs::{LSPSMessage, RawLSPSMessage, LSPS_MESSAGE_TYPE}; +use crate::transport::msgs::{LSPSMessage, RawLSPSMessage, LSPS_MESSAGE_TYPE_ID}; use crate::transport::protocol::LSPS0MessageHandler; use bitcoin::secp256k1::PublicKey; @@ -88,6 +88,7 @@ pub struct LiquidityManager< RM: Deref, CM: Deref, OM: Deref, + CMH: Deref, NS: Deref, > where ES::Target: EntropySource, @@ -100,6 +101,7 @@ pub struct LiquidityManager< RM::Target: RoutingMessageHandler, CM::Target: ChannelMessageHandler, OM::Target: OnionMessageHandler, + CMH::Target: CustomMessageHandler, NS::Target: NodeSigner, { pending_messages: Arc>>, @@ -107,7 +109,7 @@ pub struct LiquidityManager< request_id_to_method_map: Mutex>, lsps0_message_handler: LSPS0MessageHandler, lsps2_message_handler: - Option, NS>>, + Option>, provider_config: Option, channel_manager: Arc>, } @@ -124,8 +126,9 @@ impl< RM: Deref, CM: Deref, OM: Deref, + CMH: Deref, NS: Deref, - > LiquidityManager + > LiquidityManager where ES::Target: EntropySource, M::Target: chain::Watch<::Signer>, @@ -137,6 +140,7 @@ where RM::Target: RoutingMessageHandler, CM::Target: ChannelMessageHandler, OM::Target: OnionMessageHandler, + CMH::Target: CustomMessageHandler, NS::Target: NodeSigner, { /// Constructor for the [`LiquidityManager`] @@ -145,8 +149,7 @@ where pub fn new( entropy_source: ES, provider_config: Option, channel_manager: Arc>, - ) -> Self -where { + ) -> Self { let pending_messages = Arc::new(Mutex::new(vec![])); let pending_events = Arc::new(EventQueue::default()); @@ -197,8 +200,8 @@ where { /// /// Without this the messages will be sent based on whatever polling interval /// your background processor uses. - pub fn set_peer_manager( - &self, peer_manager: Arc, NS>>, + pub fn set_peer_manager ( + &self, peer_manager: Arc>, ) { if let Some(lsps2_message_handler) = &self.lsps2_message_handler { lsps2_message_handler.set_peer_manager(peer_manager); @@ -389,8 +392,9 @@ impl< RM: Deref, CM: Deref, OM: Deref, + CMH: Deref, NS: Deref, - > CustomMessageReader for LiquidityManager + > CustomMessageReader for LiquidityManager where ES::Target: EntropySource, L::Target: Logger, @@ -402,15 +406,16 @@ where RM::Target: RoutingMessageHandler, CM::Target: ChannelMessageHandler, OM::Target: OnionMessageHandler, + CMH::Target: CustomMessageHandler, NS::Target: NodeSigner, { type CustomMessage = RawLSPSMessage; - fn read( + fn read( &self, message_type: u16, buffer: &mut RD, ) -> Result, lightning::ln::msgs::DecodeError> { match message_type { - LSPS_MESSAGE_TYPE => Ok(Some(RawLSPSMessage::read(buffer)?)), + LSPS_MESSAGE_TYPE_ID => Ok(Some(RawLSPSMessage::read(buffer)?)), _ => Ok(None), } } @@ -428,8 +433,9 @@ impl< RM: Deref, CM: Deref, OM: Deref, + CMH: Deref, NS: Deref, - > CustomMessageHandler for LiquidityManager + > CustomMessageHandler for LiquidityManager where ES::Target: EntropySource, M::Target: chain::Watch<::Signer>, @@ -441,6 +447,7 @@ where RM::Target: RoutingMessageHandler, CM::Target: ChannelMessageHandler, OM::Target: OnionMessageHandler, + CMH::Target: CustomMessageHandler, NS::Target: NodeSigner, { fn handle_custom_message( diff --git a/src/transport/msgs.rs b/src/transport/msgs.rs index 1d3ff96..11b0bb4 100644 --- a/src/transport/msgs.rs +++ b/src/transport/msgs.rs @@ -23,10 +23,13 @@ const JSONRPC_INVALID_MESSAGE_ERROR_MESSAGE: &str = "parse error"; const LSPS0_LISTPROTOCOLS_METHOD_NAME: &str = "lsps0.listprotocols"; -pub const LSPS_MESSAGE_TYPE: u16 = 37913; +/// The lightning message type id for lsps messages +pub const LSPS_MESSAGE_TYPE_ID: u16 = 37913; +/// Lightning message type used by LSPS protocols #[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] pub struct RawLSPSMessage { + /// The raw string payload that holds the actual message pub payload: String, } @@ -34,7 +37,7 @@ impl_writeable_msg!(RawLSPSMessage, { payload }, {}); impl wire::Type for RawLSPSMessage { fn type_id(&self) -> u16 { - LSPS_MESSAGE_TYPE + LSPS_MESSAGE_TYPE_ID } } From c18f3533565605ef2edf30785a364c5274e02e51 Mon Sep 17 00:00:00 2001 From: John Cantrell Date: Wed, 27 Sep 2023 22:01:24 -0400 Subject: [PATCH 3/4] [wip] split inbound/outbound channels, use state transitions, cleanup --- Cargo.toml | 4 +- src/events.rs | 2 +- src/jit_channel/channel_manager.rs | 931 ++++++++++++++++++----------- src/jit_channel/event.rs | 6 - src/jit_channel/msgs.rs | 10 +- src/lib.rs | 4 +- src/transport/message_handler.rs | 13 +- src/transport/msgs.rs | 10 +- 8 files changed, 598 insertions(+), 382 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 39d7af1..8751bf8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -8,8 +8,8 @@ description = "Types and primitives to integrate a spec-compliant LSP with an LD # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -lightning = { version = "0.0.116", default-features = false, features = ["max_level_trace", "no-std", "grind_signatures"] } -lightning-invoice = { version = "0.24.0", default-features = false, features = ["no-std"] } +lightning = { version = "0.0.117-alpha2", default-features = false, features = ["max_level_trace", "no-std", "grind_signatures"] } +lightning-invoice = { version = "0.25.0-alpha2", default-features = false, features = ["no-std"] } bitcoin = { version = "0.29.2", default-features = false, features = ["serde", "secp-recovery", "rand"] } diff --git a/src/events.rs b/src/events.rs index 620223d..477c6e6 100644 --- a/src/events.rs +++ b/src/events.rs @@ -54,7 +54,7 @@ impl EventQueue { } } -/// An Event which you should probably take some action in response to. +/// An event which you should probably take some action in response to. #[derive(Debug, Clone, PartialEq, Eq)] pub enum Event { /// A LSPS2 (JIT Channel) protocol event diff --git a/src/jit_channel/channel_manager.rs b/src/jit_channel/channel_manager.rs index e4c9d75..a4d397c 100644 --- a/src/jit_channel/channel_manager.rs +++ b/src/jit_channel/channel_manager.rs @@ -19,10 +19,8 @@ use lightning::ln::channelmanager::{ChannelManager, InterceptId}; use lightning::ln::msgs::{ ChannelMessageHandler, ErrorAction, LightningError, OnionMessageHandler, RoutingMessageHandler, }; -use lightning::ln::peer_handler::{ - APeerManager, CustomMessageHandler, PeerManager, SocketDescriptor, -}; -use lightning::routing::gossip::NetworkGraph; +use lightning::ln::peer_handler::{CustomMessageHandler, PeerManager, SocketDescriptor}; +use lightning::ln::ChannelId; use lightning::routing::router::Router; use lightning::sign::{EntropySource, NodeSigner, SignerProvider}; use lightning::util::errors::APIError; @@ -43,86 +41,283 @@ use super::msgs::{ const SUPPORTED_SPEC_VERSION: u16 = 1; -#[derive(PartialEq)] -enum JITChannelState { +struct ChannelStateError(String); + +impl From for LightningError { + fn from(value: ChannelStateError) -> Self { + LightningError { err: value.0, action: ErrorAction::IgnoreAndLog(Level::Info) } + } +} + +struct InboundJITChannelConfig { + pub user_id: u128, + pub token: Option, + pub payment_size_msat: Option, +} + +#[derive(PartialEq, Debug)] +enum InboundJITChannelState { VersionsRequested, MenuRequested, PendingMenuSelection, BuyRequested, PendingPayment, - Ready, } -struct JITChannel { +impl InboundJITChannelState { + fn versions_received(&self, versions: Vec) -> Result { + if !versions.contains(&SUPPORTED_SPEC_VERSION) { + return Err(ChannelStateError(format!( + "LSP does not support our specification version. ours = {}. theirs = {:?}", + SUPPORTED_SPEC_VERSION, versions + ))); + } + + match self { + InboundJITChannelState::VersionsRequested => Ok(InboundJITChannelState::MenuRequested), + state => Err(ChannelStateError(format!( + "Received unexpected get_versions response. JIT Channel was in state: {:?}", + state + ))), + } + } + + fn info_received(&self) -> Result { + match self { + InboundJITChannelState::MenuRequested => { + Ok(InboundJITChannelState::PendingMenuSelection) + } + state => Err(ChannelStateError(format!( + "Received unexpected get_info response. JIT Channel was in state: {:?}", + state + ))), + } + } + + fn opening_fee_params_selected(&self) -> Result { + match self { + InboundJITChannelState::PendingMenuSelection => { + Ok(InboundJITChannelState::BuyRequested) + } + state => Err(ChannelStateError(format!( + "Opening fee params selected when JIT Channel was in state: {:?}", + state + ))), + } + } + + fn invoice_params_received(&self) -> Result { + match self { + InboundJITChannelState::BuyRequested => Ok(InboundJITChannelState::PendingPayment), + state => Err(ChannelStateError(format!( + "Invoice params received when JIT Channel was in state: {:?}", + state + ))), + } + } +} + +struct InboundJITChannel { id: u128, - user_id: u128, - state: JITChannelState, - fees: Option, - token: Option, - min_payment_size_msat: Option, - max_payment_size_msat: Option, - payment_size_msat: Option, - counterparty_node_id: PublicKey, - amt_to_forward_msat: Option, - intercept_id: Option, - scid: Option, - lsp_cltv_expiry_delta: Option, + state: InboundJITChannelState, + config: InboundJITChannelConfig, } -impl JITChannel { +impl InboundJITChannel { pub fn new( - id: u128, counterparty_node_id: PublicKey, user_id: u128, payment_size_msat: Option, - token: Option, + id: u128, user_id: u128, payment_size_msat: Option, token: Option, ) -> Self { Self { id, - counterparty_node_id, - token, - user_id, - state: JITChannelState::VersionsRequested, - fees: None, - min_payment_size_msat: None, - max_payment_size_msat: None, + config: InboundJITChannelConfig { user_id, payment_size_msat, token }, + state: InboundJITChannelState::VersionsRequested, + } + } + + pub fn versions_received(&mut self, versions: Vec) -> Result<(), LightningError> { + self.state = self.state.versions_received(versions)?; + Ok(()) + } + + pub fn info_received(&mut self) -> Result<(), LightningError> { + self.state = self.state.info_received()?; + Ok(()) + } + + pub fn opening_fee_params_selected(&mut self) -> Result<(), LightningError> { + self.state = self.state.opening_fee_params_selected()?; + Ok(()) + } + + pub fn invoice_params_received(&mut self) -> Result<(), LightningError> { + self.state = self.state.invoice_params_received()?; + Ok(()) + } +} + +#[derive(PartialEq, Debug)] +enum OutboundJITChannelState { + InvoiceParametersGenerated { + scid: u64, + cltv_expiry_delta: u32, + payment_size_msat: Option, + opening_fee_params: OpeningFeeParams, + }, + PendingChannelOpen { + intercept_id: InterceptId, + opening_fee_msat: u64, + amt_to_forward_msat: u64, + }, + ChannelReady { + intercept_id: InterceptId, + amt_to_forward_msat: u64, + }, +} + +impl OutboundJITChannelState { + pub fn new( + scid: u64, cltv_expiry_delta: u32, payment_size_msat: Option, + opening_fee_params: OpeningFeeParams, + ) -> Self { + OutboundJITChannelState::InvoiceParametersGenerated { + scid, + cltv_expiry_delta, payment_size_msat, - scid: None, - amt_to_forward_msat: None, - lsp_cltv_expiry_delta: None, - intercept_id: None, + opening_fee_params, + } + } + + pub fn htlc_intercepted( + &self, expected_outbound_amount_msat: u64, intercept_id: InterceptId, + ) -> Result { + match self { + OutboundJITChannelState::InvoiceParametersGenerated { opening_fee_params, .. } => { + let opening_fee_msat: Option = utils::compute_opening_fee( + expected_outbound_amount_msat, + opening_fee_params.min_fee_msat, + opening_fee_params.proportional as u64, + ); + + if let Some(opening_fee_msat) = opening_fee_msat { + Ok(OutboundJITChannelState::PendingChannelOpen { + intercept_id, + opening_fee_msat, + amt_to_forward_msat: expected_outbound_amount_msat - opening_fee_msat, + }) + } else { + Err(ChannelStateError(format!("Could not compute valid opening fee with min_fee_msat = {}, proportional = {}, and expected_outbound_amount_msat = {}", opening_fee_params.min_fee_msat, opening_fee_params.proportional, expected_outbound_amount_msat))) + } + } + state => Err(ChannelStateError(format!( + "Invoice params received when JIT Channel was in state: {:?}", + state + ))), + } + } + + pub fn channel_ready(&self) -> Result { + match self { + OutboundJITChannelState::PendingChannelOpen { + intercept_id, + amt_to_forward_msat, + .. + } => Ok(OutboundJITChannelState::ChannelReady { + intercept_id: *intercept_id, + amt_to_forward_msat: *amt_to_forward_msat, + }), + state => Err(ChannelStateError(format!( + "Channel ready received when JIT Channel was in state: {:?}", + state + ))), + } + } +} + +struct OutboundJITChannel { + state: OutboundJITChannelState, +} + +impl OutboundJITChannel { + pub fn new( + scid: u64, cltv_expiry_delta: u32, payment_size_msat: Option, + opening_fee_params: OpeningFeeParams, + ) -> Self { + Self { + state: OutboundJITChannelState::new( + scid, + cltv_expiry_delta, + payment_size_msat, + opening_fee_params, + ), + } + } + + pub fn htlc_intercepted( + &mut self, expected_outbound_amount_msat: u64, intercept_id: InterceptId, + ) -> Result<(u64, u64), LightningError> { + self.state = self.state.htlc_intercepted(expected_outbound_amount_msat, intercept_id)?; + + match &self.state { + OutboundJITChannelState::PendingChannelOpen { + opening_fee_msat, + amt_to_forward_msat, + .. + } => Ok((*opening_fee_msat, *amt_to_forward_msat)), + impossible_state => Err(LightningError { + err: format!( + "Impossible state transition during htlc_intercepted to {:?}", + impossible_state + ), + action: ErrorAction::IgnoreAndLog(Level::Info), + }), + } + } + + pub fn channel_ready(&mut self) -> Result<(InterceptId, u64), LightningError> { + self.state = self.state.channel_ready()?; + + match &self.state { + OutboundJITChannelState::ChannelReady { intercept_id, amt_to_forward_msat } => { + Ok((*intercept_id, *amt_to_forward_msat)) + } + impossible_state => Err(LightningError { + err: format!( + "Impossible state transition during channel_ready to {:?}", + impossible_state + ), + action: ErrorAction::IgnoreAndLog(Level::Info), + }), } } } #[derive(Default)] struct PeerState { - channels_by_id: HashMap, + inbound_channels_by_id: HashMap, + outbound_channels_by_scid: HashMap, request_to_cid: HashMap, pending_requests: HashMap, } impl PeerState { - pub fn insert_channel(&mut self, channel_id: u128, channel: JITChannel) { - self.channels_by_id.insert(channel_id, channel); + pub fn insert_inbound_channel(&mut self, jit_channel_id: u128, channel: InboundJITChannel) { + self.inbound_channels_by_id.insert(jit_channel_id, channel); } - pub fn insert_request(&mut self, request_id: RequestId, channel_id: u128) { - self.request_to_cid.insert(request_id, channel_id); + pub fn insert_outbound_channel(&mut self, scid: u64, channel: OutboundJITChannel) { + self.outbound_channels_by_scid.insert(scid, channel); } - pub fn get_channel_in_state_for_request( - &mut self, request_id: &RequestId, state: JITChannelState, - ) -> Option<&mut JITChannel> { - let channel_id = self.request_to_cid.remove(request_id)?; + pub fn insert_request(&mut self, request_id: RequestId, jit_channel_id: u128) { + self.request_to_cid.insert(request_id, jit_channel_id); + } - if let Some(channel) = self.channels_by_id.get_mut(&channel_id) { - if channel.state == state { - return Some(channel); - } - } - None + pub fn remove_inbound_channel(&mut self, jit_channel_id: u128) { + self.inbound_channels_by_id.remove(&jit_channel_id); } - pub fn remove_channel(&mut self, channel_id: u128) { - self.channels_by_id.remove(&channel_id); + pub fn remove_outbound_channel(&mut self, scid: u64) { + self.outbound_channels_by_scid.remove(&scid); } } @@ -160,7 +355,7 @@ pub struct JITChannelManager< pending_messages: Arc>>, pending_events: Arc, per_peer_state: RwLock>>, - channels_by_scid: RwLock>, + peer_by_scid: RwLock>, promise_secret: [u8; 32], } @@ -205,12 +400,17 @@ where pending_messages, pending_events, per_peer_state: RwLock::new(HashMap::new()), - channels_by_scid: RwLock::new(HashMap::new()), + peer_by_scid: RwLock::new(HashMap::new()), peer_manager: Mutex::new(None), channel_manager, } } + fn map_scid_to_peer(&self, scid: u64, counterparty_node_id: PublicKey) { + let mut peer_by_scid = self.peer_by_scid.write().unwrap(); + peer_by_scid.insert(scid, counterparty_node_id); + } + pub fn set_peer_manager( &self, peer_manager: Arc>, ) { @@ -221,23 +421,19 @@ where &self, counterparty_node_id: PublicKey, payment_size_msat: Option, token: Option, user_channel_id: u128, ) { - let channel_id = self.generate_channel_id(); - let channel = JITChannel::new( - channel_id, - counterparty_node_id, - user_channel_id, - payment_size_msat, - token, - ); + let jit_channel_id = self.generate_jit_channel_id(); + let channel = + InboundJITChannel::new(jit_channel_id, user_channel_id, payment_size_msat, token); - let mut per_peer_state = self.per_peer_state.write().unwrap(); - let peer_state_mutex = - per_peer_state.entry(counterparty_node_id).or_insert(Mutex::new(PeerState::default())); - let peer_state = peer_state_mutex.get_mut().unwrap(); - peer_state.insert_channel(channel_id, channel); + let mut outer_state_lock = self.per_peer_state.write().unwrap(); + let inner_state_lock = outer_state_lock + .entry(counterparty_node_id) + .or_insert(Mutex::new(PeerState::default())); + let peer_state = inner_state_lock.get_mut().unwrap(); + peer_state.insert_inbound_channel(jit_channel_id, channel); let request_id = self.generate_request_id(); - peer_state.insert_request(request_id.clone(), channel_id); + peer_state.insert_request(request_id.clone(), jit_channel_id); { let mut pending_messages = self.pending_messages.lock().unwrap(); @@ -257,11 +453,11 @@ where opening_fee_params_menu: Vec, min_payment_size_msat: u64, max_payment_size_msat: u64, ) -> Result<(), APIError> { - let per_peer_state = self.per_peer_state.read().unwrap(); + let outer_state_lock = self.per_peer_state.read().unwrap(); - match per_peer_state.get(&counterparty_node_id) { - Some(peer_state_mutex) => { - let mut peer_state = peer_state_mutex.lock().unwrap(); + match outer_state_lock.get(&counterparty_node_id) { + Some(inner_state_lock) => { + let mut peer_state = inner_state_lock.lock().unwrap(); match peer_state.pending_requests.remove(&request_id) { Some(Request::GetInfo(_)) => { @@ -291,49 +487,46 @@ where } pub fn opening_fee_params_selected( - &self, counterparty_node_id: PublicKey, channel_id: u128, + &self, counterparty_node_id: PublicKey, jit_channel_id: u128, opening_fee_params: OpeningFeeParams, ) -> Result<(), APIError> { - let per_peer_state = self.per_peer_state.read().unwrap(); - match per_peer_state.get(&counterparty_node_id) { - Some(peer_state_mutex) => { - let mut peer_state = peer_state_mutex.lock().unwrap(); - - if let Some(channel) = peer_state.channels_by_id.get_mut(&channel_id) { - if channel.state == JITChannelState::PendingMenuSelection { - channel.state = JITChannelState::BuyRequested; - channel.fees = Some(opening_fee_params.clone()); + let outer_state_lock = self.per_peer_state.read().unwrap(); + match outer_state_lock.get(&counterparty_node_id) { + Some(inner_state_lock) => { + let mut peer_state = inner_state_lock.lock().unwrap(); + if let Some(jit_channel) = + peer_state.inbound_channels_by_id.get_mut(&jit_channel_id) + { + if let Err(e) = jit_channel.opening_fee_params_selected() { + peer_state.remove_inbound_channel(jit_channel_id); + return Err(APIError::APIMisuseError { err: e.err }); + } - let request_id = self.generate_request_id(); - let payment_size_msat = channel.payment_size_msat; - peer_state.insert_request(request_id.clone(), channel_id); + let request_id = self.generate_request_id(); + let payment_size_msat = jit_channel.config.payment_size_msat; + peer_state.insert_request(request_id.clone(), jit_channel_id); - { - let mut pending_messages = self.pending_messages.lock().unwrap(); - pending_messages.push(( - counterparty_node_id, - Message::Request( - request_id, - Request::Buy(BuyRequest { - version: SUPPORTED_SPEC_VERSION, - opening_fee_params, - payment_size_msat, - }), - ) - .into(), - )); - } - if let Some(peer_manager) = self.peer_manager.lock().unwrap().as_ref() { - peer_manager.process_events(); - } - } else { - return Err(APIError::APIMisuseError { - err: "Channel is not pending menu selection".to_string(), - }); + { + let mut pending_messages = self.pending_messages.lock().unwrap(); + pending_messages.push(( + counterparty_node_id, + Message::Request( + request_id, + Request::Buy(BuyRequest { + version: SUPPORTED_SPEC_VERSION, + opening_fee_params, + payment_size_msat, + }), + ) + .into(), + )); + } + if let Some(peer_manager) = self.peer_manager.lock().unwrap().as_ref() { + peer_manager.process_events(); } } else { return Err(APIError::APIMisuseError { - err: format!("Channel with id {} not found", channel_id), + err: format!("Channel with id {} not found", jit_channel_id), }); } } @@ -351,34 +544,24 @@ where &self, counterparty_node_id: PublicKey, request_id: RequestId, scid: u64, cltv_expiry_delta: u32, client_trusts_lsp: bool, ) -> Result<(), APIError> { - let per_peer_state = self.per_peer_state.read().unwrap(); + let outer_state_lock = self.per_peer_state.read().unwrap(); - match per_peer_state.get(&counterparty_node_id) { - Some(peer_state_mutex) => { - let mut peer_state = peer_state_mutex.lock().unwrap(); + match outer_state_lock.get(&counterparty_node_id) { + Some(inner_state_lock) => { + let mut peer_state = inner_state_lock.lock().unwrap(); match peer_state.pending_requests.remove(&request_id) { Some(Request::Buy(buy_request)) => { - let mut channels_by_scid = self.channels_by_scid.write().unwrap(); - channels_by_scid.insert( + self.map_scid_to_peer(scid, counterparty_node_id.clone()); + let outbound_jit_channel = OutboundJITChannel::new( scid, - JITChannel { - id: 0, - user_id: 0, - state: JITChannelState::BuyRequested, - fees: Some(buy_request.opening_fee_params), - token: None, - min_payment_size_msat: None, - max_payment_size_msat: None, - payment_size_msat: buy_request.payment_size_msat, - counterparty_node_id, - scid: Some(scid), - lsp_cltv_expiry_delta: Some(cltv_expiry_delta), - amt_to_forward_msat: None, - intercept_id: None, - }, + cltv_expiry_delta, + buy_request.payment_size_msat, + buy_request.opening_fee_params, ); + peer_state.insert_outbound_channel(scid, outbound_jit_channel); + let block = scid_utils::block_from_scid(&scid); let tx_index = scid_utils::tx_index_from_scid(&scid); let vout = scid_utils::vout_from_scid(&scid); @@ -408,39 +591,45 @@ where } } - // need to decide if we should ignore, enqueue OpenChannel event, or enqueue FailInterceptedHTLC event pub(crate) fn htlc_intercepted( &self, scid: u64, intercept_id: InterceptId, inbound_amount_msat: u64, expected_outbound_amount_msat: u64, ) -> Result<(), APIError> { - let mut channels_by_scid = self.channels_by_scid.write().unwrap(); - - if let Some(channel) = channels_by_scid.get_mut(&scid) { - if let Some(fees) = &channel.fees { - let opening_fee_msat = utils::compute_opening_fee( - expected_outbound_amount_msat, - fees.min_fee_msat, - fees.proportional as u64, - ); - - if let Some(opening_fee_msat) = opening_fee_msat { - let amt_to_forward_msat = expected_outbound_amount_msat - opening_fee_msat; - channel.amt_to_forward_msat = Some(amt_to_forward_msat); - channel.intercept_id = Some(intercept_id); - - self.enqueue_event(Event::LSPS2(crate::JITChannelEvent::OpenChannel { - their_network_key: channel.counterparty_node_id, - inbound_amount_msat, - expected_outbound_amount_msat, - amt_to_forward_msat, - opening_fee_msat, - user_channel_id: scid as u128, - })); - } else { - self.channel_manager.fail_intercepted_htlc(intercept_id)?; + let peer_by_scid = self.peer_by_scid.read().unwrap(); + if let Some(counterparty_node_id) = peer_by_scid.get(&scid) { + let outer_state_lock = self.per_peer_state.read().unwrap(); + match outer_state_lock.get(counterparty_node_id) { + Some(inner_state_lock) => { + let mut peer_state = inner_state_lock.lock().unwrap(); + if let Some(jit_channel) = peer_state.outbound_channels_by_scid.get_mut(&scid) { + match jit_channel + .htlc_intercepted(expected_outbound_amount_msat, intercept_id) + { + Ok((opening_fee_msat, amt_to_forward_msat)) => { + self.enqueue_event(Event::LSPS2( + crate::JITChannelEvent::OpenChannel { + their_network_key: counterparty_node_id.clone(), + inbound_amount_msat, + expected_outbound_amount_msat, + amt_to_forward_msat, + opening_fee_msat, + user_channel_id: scid as u128, + }, + )); + } + Err(e) => { + self.channel_manager.fail_intercepted_htlc(intercept_id)?; + // remove channel? + return Err(APIError::APIMisuseError { err: e.err }); + } + } + } + } + None => { + return Err(APIError::APIMisuseError { + err: format!("No counterparty found for scid: {}", scid), + }); } - } else { - self.channel_manager.fail_intercepted_htlc(intercept_id)?; } } @@ -449,36 +638,53 @@ where // figure out which intercept id is waiting on this channel and enqueue ForwardInterceptedHTLC event pub(crate) fn channel_ready( - &self, user_channel_id: u128, channel_id: &[u8; 32], counterparty_node_id: &PublicKey, + &self, user_channel_id: u128, channel_id: &ChannelId, counterparty_node_id: &PublicKey, ) -> Result<(), APIError> { - let channels_by_scid = self.channels_by_scid.read().unwrap(); - if let Ok(scid) = user_channel_id.try_into() { - if let Some(channel) = channels_by_scid.get(&scid) { - self.channel_manager.forward_intercepted_htlc( - channel.intercept_id.unwrap(), - channel_id, - *counterparty_node_id, - channel.amt_to_forward_msat.unwrap(), - )?; - } else { - return Err(APIError::APIMisuseError { - err: format!( - "Could not find a channel with user_channel_id {}", - user_channel_id - ), - }); + let outer_state_lock = self.per_peer_state.read().unwrap(); + match outer_state_lock.get(counterparty_node_id) { + Some(inner_state_lock) => { + let mut peer_state = inner_state_lock.lock().unwrap(); + if let Some(jit_channel) = peer_state.outbound_channels_by_scid.get_mut(&scid) { + match jit_channel.channel_ready() { + Ok((intercept_id, amt_to_forward_msat)) => { + self.channel_manager.forward_intercepted_htlc( + intercept_id, + channel_id, + *counterparty_node_id, + amt_to_forward_msat, + )?; + } + Err(e) => { + return Err(APIError::APIMisuseError { + err: format!( + "Failed to transition to channel ready: {}", + e.err + ), + }) + } + } + } else { + return Err(APIError::APIMisuseError { + err: format!( + "Could not find a channel with user_channel_id {}", + user_channel_id + ), + }); + } + } + None => { + return Err(APIError::APIMisuseError { + err: format!("No counterparty state for: {}", counterparty_node_id), + }); + } } - } else { - return Err(APIError::APIMisuseError { - err: format!("Could not parse user_channel_id into u64 scid {}", user_channel_id), - }); } Ok(()) } - fn generate_channel_id(&self) -> u128 { + fn generate_jit_channel_id(&self) -> u128 { let bytes = self.entropy_source.get_secure_random_bytes(); let mut id_bytes: [u8; 16] = [0; 16]; id_bytes.copy_from_slice(&bytes[0..16]); @@ -523,56 +729,58 @@ where fn handle_get_versions_response( &self, request_id: RequestId, counterparty_node_id: &PublicKey, result: GetVersionsResponse, ) -> Result<(), LightningError> { - let per_peer_state = self.per_peer_state.read().unwrap(); - match per_peer_state.get(counterparty_node_id) { - Some(peer_state_mutex) => { - let mut peer_state = peer_state_mutex.lock().unwrap(); - - match peer_state.get_channel_in_state_for_request( - &request_id, - JITChannelState::VersionsRequested, - ) { - Some(channel) => { - let channel_id = channel.id; - let token = channel.token.clone(); - - if result.versions.contains(&SUPPORTED_SPEC_VERSION) { - channel.state = JITChannelState::MenuRequested; - - let request_id = self.generate_request_id(); - peer_state.insert_request(request_id.clone(), channel_id); - - { - let mut pending_messages = self.pending_messages.lock().unwrap(); - pending_messages.push(( - *counterparty_node_id, - Message::Request( - request_id, - Request::GetInfo(GetInfoRequest { - version: SUPPORTED_SPEC_VERSION, - token, - }), - ) - .into(), - )); - } + let outer_state_lock = self.per_peer_state.read().unwrap(); + match outer_state_lock.get(counterparty_node_id) { + Some(inner_state_lock) => { + let mut peer_state = inner_state_lock.lock().unwrap(); - if let Some(peer_manager) = self.peer_manager.lock().unwrap().as_ref() { - peer_manager.process_events(); - } - } else { - peer_state.remove_channel(channel_id); - } - } - None => { - return Err(LightningError { - err: format!( - "Received get_versions response without a matching channel: {:?}", - request_id - ), - action: ErrorAction::IgnoreAndLog(Level::Info), - }) - } + let jit_channel_id = + peer_state.request_to_cid.remove(&request_id).ok_or(LightningError { + err: format!( + "Received get_versions response for an unknown request: {:?}", + request_id + ), + action: ErrorAction::IgnoreAndLog(Level::Info), + })?; + + let jit_channel = peer_state + .inbound_channels_by_id + .get_mut(&jit_channel_id) + .ok_or(LightningError { + err: format!( + "Received get_versions response for an unknown channel: {:?}", + jit_channel_id + ), + action: ErrorAction::IgnoreAndLog(Level::Info), + })?; + + let token = jit_channel.config.token.clone(); + + if let Err(e) = jit_channel.versions_received(result.versions) { + peer_state.remove_inbound_channel(jit_channel_id); + return Err(e); + } + + let request_id = self.generate_request_id(); + peer_state.insert_request(request_id.clone(), jit_channel_id); + + { + let mut pending_messages = self.pending_messages.lock().unwrap(); + pending_messages.push(( + *counterparty_node_id, + Message::Request( + request_id, + Request::GetInfo(GetInfoRequest { + version: SUPPORTED_SPEC_VERSION, + token, + }), + ) + .into(), + )); + } + + if let Some(peer_manager) = self.peer_manager.lock().unwrap().as_ref() { + peer_manager.process_events(); } } None => { @@ -592,10 +800,11 @@ where fn handle_get_info_request( &self, request_id: RequestId, counterparty_node_id: &PublicKey, params: GetInfoRequest, ) -> Result<(), LightningError> { - let mut per_peer_state = self.per_peer_state.write().unwrap(); - let peer_state_mutex: &mut Mutex = - per_peer_state.entry(*counterparty_node_id).or_insert(Mutex::new(PeerState::default())); - let peer_state = peer_state_mutex.get_mut().unwrap(); + let mut outer_state_lock = self.per_peer_state.write().unwrap(); + let inner_state_lock: &mut Mutex = outer_state_lock + .entry(*counterparty_node_id) + .or_insert(Mutex::new(PeerState::default())); + let peer_state = inner_state_lock.get_mut().unwrap(); peer_state.pending_requests.insert(request_id.clone(), Request::GetInfo(params.clone())); self.enqueue_event(Event::LSPS2(super::event::Event::GetInfo { @@ -610,38 +819,44 @@ where fn handle_get_info_response( &self, request_id: RequestId, counterparty_node_id: &PublicKey, result: GetInfoResponse, ) -> Result<(), LightningError> { - let per_peer_state = self.per_peer_state.read().unwrap(); - match per_peer_state.get(counterparty_node_id) { - Some(peer_state_mutex) => { - let mut peer_state = peer_state_mutex.lock().unwrap(); + let outer_state_lock = self.per_peer_state.read().unwrap(); + match outer_state_lock.get(counterparty_node_id) { + Some(inner_state_lock) => { + let mut peer_state = inner_state_lock.lock().unwrap(); - match peer_state - .get_channel_in_state_for_request(&request_id, JITChannelState::MenuRequested) - { - Some(channel) => { - channel.state = JITChannelState::PendingMenuSelection; - channel.min_payment_size_msat = Some(result.min_payment_size_msat); - channel.max_payment_size_msat = Some(result.max_payment_size_msat); - - self.enqueue_event(Event::LSPS2(super::event::Event::GetInfoResponse { - counterparty_node_id: *counterparty_node_id, - opening_fee_params_menu: result.opening_fee_params_menu, - min_payment_size_msat: result.min_payment_size_msat, - max_payment_size_msat: result.max_payment_size_msat, - channel_id: channel.id, - user_channel_id: channel.user_id, - })); - } - None => { - return Err(LightningError { - err: format!( - "Received get_info response without a matching channel: {:?}", - request_id - ), - action: ErrorAction::IgnoreAndLog(Level::Info), - }) - } + let jit_channel_id = + peer_state.request_to_cid.remove(&request_id).ok_or(LightningError { + err: format!( + "Received get_info response for an unknown request: {:?}", + request_id + ), + action: ErrorAction::IgnoreAndLog(Level::Info), + })?; + + let jit_channel = peer_state + .inbound_channels_by_id + .get_mut(&jit_channel_id) + .ok_or(LightningError { + err: format!( + "Received get_info response for an unknown channel: {:?}", + jit_channel_id + ), + action: ErrorAction::IgnoreAndLog(Level::Info), + })?; + + if let Err(e) = jit_channel.info_received() { + peer_state.remove_inbound_channel(jit_channel_id); + return Err(e); } + + self.enqueue_event(Event::LSPS2(super::event::Event::GetInfoResponse { + counterparty_node_id: *counterparty_node_id, + opening_fee_params_menu: result.opening_fee_params_menu, + min_payment_size_msat: result.min_payment_size_msat, + max_payment_size_msat: result.max_payment_size_msat, + channel_id: jit_channel.id, + user_channel_id: jit_channel.config.user_id, + })); } None => { return Err(LightningError { @@ -658,37 +873,36 @@ where } fn handle_get_info_error( - &self, request_id: RequestId, counterparty_node_id: &PublicKey, error: ResponseError, + &self, request_id: RequestId, counterparty_node_id: &PublicKey, _error: ResponseError, ) -> Result<(), LightningError> { - let per_peer_state = self.per_peer_state.read().unwrap(); - match per_peer_state.get(counterparty_node_id) { - Some(peer_state_mutex) => { - let mut peer_state = peer_state_mutex.lock().unwrap(); + let outer_state_lock = self.per_peer_state.read().unwrap(); + match outer_state_lock.get(counterparty_node_id) { + Some(inner_state_lock) => { + let mut peer_state = inner_state_lock.lock().unwrap(); - match peer_state - .get_channel_in_state_for_request(&request_id, JITChannelState::BuyRequested) - { - Some(channel) => { - let channel_id = channel.id; - peer_state.remove_channel(channel_id); - return Err(LightningError { - err: format!("Received error response from getinfo request ({:?}) with counterparty {:?}. Removing channel {}. code = {}, message = {}", request_id, counterparty_node_id, channel_id, error.code, error.message), - action: ErrorAction::IgnoreAndLog(Level::Info) - }); - } - None => { - return Err(LightningError { - err: format!("Received an unexpected error response for a getinfo request from counterparty ({:?})", counterparty_node_id), - action: ErrorAction::IgnoreAndLog(Level::Info) - }); - } - } + let jit_channel_id = + peer_state.request_to_cid.remove(&request_id).ok_or(LightningError { + err: format!( + "Received get_info error for an unknown request: {:?}", + request_id + ), + action: ErrorAction::IgnoreAndLog(Level::Info), + })?; + + let _jit_channel = peer_state + .inbound_channels_by_id + .remove(&jit_channel_id) + .ok_or(LightningError { + err: format!( + "Received get_info error for an unknown channel: {:?}", + jit_channel_id + ), + action: ErrorAction::IgnoreAndLog(Level::Info), + })?; + Ok(()) } None => { - return Err(LightningError { - err: format!("Received error response for a getinfo request from an unknown counterparty ({:?})", counterparty_node_id), - action: ErrorAction::IgnoreAndLog(Level::Info) - }); + return Err(LightningError { err: format!("Received error response for a get_info request from an unknown counterparty ({:?})",counterparty_node_id), action: ErrorAction::IgnoreAndLog(Level::Info)}); } } } @@ -697,11 +911,11 @@ where &self, request_id: RequestId, counterparty_node_id: &PublicKey, params: BuyRequest, ) -> Result<(), LightningError> { if params.opening_fee_params.is_valid(&self.promise_secret) { - let mut per_peer_state = self.per_peer_state.write().unwrap(); - let peer_state_mutex = per_peer_state + let mut outer_state_lock = self.per_peer_state.write().unwrap(); + let inner_state_lock = outer_state_lock .entry(*counterparty_node_id) .or_insert(Mutex::new(PeerState::default())); - let peer_state = peer_state_mutex.get_mut().unwrap(); + let peer_state = inner_state_lock.get_mut().unwrap(); peer_state.pending_requests.insert(request_id.clone(), Request::Buy(params.clone())); self.enqueue_event(Event::LSPS2(super::event::Event::BuyRequest { @@ -718,55 +932,55 @@ where fn handle_buy_response( &self, request_id: RequestId, counterparty_node_id: &PublicKey, result: BuyResponse, ) -> Result<(), LightningError> { - let per_peer_state = self.per_peer_state.read().unwrap(); - match per_peer_state.get(counterparty_node_id) { - Some(peer_state_mutex) => { - let mut peer_state = peer_state_mutex.lock().unwrap(); + let outer_state_lock = self.per_peer_state.read().unwrap(); + match outer_state_lock.get(counterparty_node_id) { + Some(inner_state_lock) => { + let mut peer_state = inner_state_lock.lock().unwrap(); - match peer_state - .get_channel_in_state_for_request(&request_id, JITChannelState::BuyRequested) - { - Some(channel) => { - channel.state = JITChannelState::PendingPayment; - channel.lsp_cltv_expiry_delta = Some(result.lsp_cltv_expiry_delta); + let jit_channel_id = + peer_state.request_to_cid.remove(&request_id).ok_or(LightningError { + err: format!( + "Received buy response for an unknown request: {:?}", + request_id + ), + action: ErrorAction::IgnoreAndLog(Level::Info), + })?; - if let Ok(scid) = - scid_utils::scid_from_human_readable_string(&result.jit_channel_scid) - { - channel.scid = Some(scid); - - self.enqueue_event(Event::LSPS2( - super::event::Event::InvoiceGenerationReady { - counterparty_node_id: *counterparty_node_id, - scid, - cltv_expiry_delta: result.lsp_cltv_expiry_delta, - min_payment_size_msat: channel.min_payment_size_msat, - max_payment_size_msat: channel.max_payment_size_msat, - payment_size_msat: channel.payment_size_msat, - fees: channel.fees.clone().unwrap(), - client_trusts_lsp: result.client_trusts_lsp, - user_channel_id: channel.user_id, - }, - )); - } else { - return Err(LightningError { - err: format!( - "Received buy response with an invalid scid {}", - result.jit_channel_scid - ), - action: ErrorAction::IgnoreAndLog(Level::Info), - }); - } - } - None => { - return Err(LightningError { - err: format!( - "Received buy response without a matching channel: {:?}", - request_id - ), - action: ErrorAction::IgnoreAndLog(Level::Info), - }); - } + let jit_channel = peer_state + .inbound_channels_by_id + .get_mut(&jit_channel_id) + .ok_or(LightningError { + err: format!( + "Received buy response for an unknown channel: {:?}", + jit_channel_id + ), + action: ErrorAction::IgnoreAndLog(Level::Info), + })?; + + if let Err(e) = jit_channel.invoice_params_received() { + peer_state.remove_inbound_channel(jit_channel_id); + return Err(e); + } + + if let Ok(scid) = + scid_utils::scid_from_human_readable_string(&result.jit_channel_scid) + { + self.enqueue_event(Event::LSPS2(super::event::Event::InvoiceGenerationReady { + counterparty_node_id: *counterparty_node_id, + scid, + cltv_expiry_delta: result.lsp_cltv_expiry_delta, + payment_size_msat: jit_channel.config.payment_size_msat, + client_trusts_lsp: result.client_trusts_lsp, + user_channel_id: jit_channel.config.user_id, + })); + } else { + return Err(LightningError { + err: format!( + "Received buy response with an invalid scid {}", + result.jit_channel_scid + ), + action: ErrorAction::IgnoreAndLog(Level::Info), + }); } } None => { @@ -783,25 +997,30 @@ where } fn handle_buy_error( - &self, request_id: RequestId, counterparty_node_id: &PublicKey, error: ResponseError, + &self, request_id: RequestId, counterparty_node_id: &PublicKey, _error: ResponseError, ) -> Result<(), LightningError> { - let per_peer_state = self.per_peer_state.read().unwrap(); - match per_peer_state.get(counterparty_node_id) { - Some(peer_state_mutex) => { - let mut peer_state = peer_state_mutex.lock().unwrap(); - - match peer_state - .get_channel_in_state_for_request(&request_id, JITChannelState::BuyRequested) - { - Some(channel) => { - let channel_id = channel.id; - peer_state.remove_channel(channel_id); - return Err(LightningError { err: format!( "Received error response from buy request ({:?}) with counterparty {:?}. Removing channel {}. code = {}, message = {}", request_id, counterparty_node_id, channel_id, error.code, error.message), action: ErrorAction::IgnoreAndLog(Level::Info)}); - } - None => { - return Err(LightningError { err: format!("Received an unexpected error response for a buy request from counterparty ({:?})", counterparty_node_id), action: ErrorAction::IgnoreAndLog(Level::Info)}); - } - } + let outer_state_lock = self.per_peer_state.read().unwrap(); + match outer_state_lock.get(counterparty_node_id) { + Some(inner_state_lock) => { + let mut peer_state = inner_state_lock.lock().unwrap(); + + let jit_channel_id = + peer_state.request_to_cid.remove(&request_id).ok_or(LightningError { + err: format!("Received buy error for an unknown request: {:?}", request_id), + action: ErrorAction::IgnoreAndLog(Level::Info), + })?; + + let _jit_channel = peer_state + .inbound_channels_by_id + .remove(&jit_channel_id) + .ok_or(LightningError { + err: format!( + "Received buy error for an unknown channel: {:?}", + jit_channel_id + ), + action: ErrorAction::IgnoreAndLog(Level::Info), + })?; + Ok(()) } None => { return Err(LightningError { err: format!("Received error response for a buy request from an unknown counterparty ({:?})",counterparty_node_id), action: ErrorAction::IgnoreAndLog(Level::Info)}); diff --git a/src/jit_channel/event.rs b/src/jit_channel/event.rs index a86b7c0..ba41444 100644 --- a/src/jit_channel/event.rs +++ b/src/jit_channel/event.rs @@ -79,12 +79,6 @@ pub enum Event { scid: u64, /// The cltv_expiry_delta to use in the route hint. cltv_expiry_delta: u32, - /// The agreed upon channel parameters. - fees: OpeningFeeParams, - /// The min payment size allowed. - min_payment_size_msat: Option, - /// The max payment size allowed. - max_payment_size_msat: Option, /// The initial payment size you specified. payment_size_msat: Option, /// The trust model the lsp expects. diff --git a/src/jit_channel/msgs.rs b/src/jit_channel/msgs.rs index 10bd18f..167eef0 100644 --- a/src/jit_channel/msgs.rs +++ b/src/jit_channel/msgs.rs @@ -8,8 +8,8 @@ use serde::{Deserialize, Serialize}; use crate::transport::msgs::{LSPSMessage, RequestId, ResponseError}; use crate::utils; -pub(crate) const LSPS2_GETVERSIONS_METHOD_NAME: &str = "lsps2.getversions"; -pub(crate) const LSPS2_GETINFO_METHOD_NAME: &str = "lsps2.getinfo"; +pub(crate) const LSPS2_GET_VERSIONS_METHOD_NAME: &str = "lsps2.get_versions"; +pub(crate) const LSPS2_GET_INFO_METHOD_NAME: &str = "lsps2.get_info"; pub(crate) const LSPS2_BUY_METHOD_NAME: &str = "lsps2.buy"; #[derive(Clone, Debug, PartialEq, Eq, Deserialize, Serialize, Default)] @@ -118,7 +118,7 @@ pub struct BuyRequest { } /// A response from a buy request made by a client -/// +/// /// Includes information needed to construct an invoice. #[derive(Clone, Debug, PartialEq, Eq, Deserialize, Serialize)] pub struct BuyResponse { @@ -141,8 +141,8 @@ pub enum Request { impl Request { pub fn method(&self) -> &str { match self { - Request::GetVersions(_) => LSPS2_GETVERSIONS_METHOD_NAME, - Request::GetInfo(_) => LSPS2_GETINFO_METHOD_NAME, + Request::GetVersions(_) => LSPS2_GET_VERSIONS_METHOD_NAME, + Request::GetInfo(_) => LSPS2_GET_INFO_METHOD_NAME, Request::Buy(_) => LSPS2_BUY_METHOD_NAME, } } diff --git a/src/lib.rs b/src/lib.rs index e8c4e83..dba71d4 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -24,9 +24,9 @@ mod jit_channel; mod transport; mod utils; -pub use transport::msgs::{LSPS_MESSAGE_TYPE_ID, RawLSPSMessage}; pub use jit_channel::event::Event as JITChannelEvent; -pub use jit_channel::msgs::{OpeningFeeParams, RawOpeningFeeParams, BuyResponse, GetInfoResponse}; +pub use jit_channel::msgs::{BuyResponse, GetInfoResponse, OpeningFeeParams, RawOpeningFeeParams}; pub use transport::message_handler::{ JITChannelsConfig, LiquidityManager, LiquidityProviderConfig, }; +pub use transport::msgs::{RawLSPSMessage, LSPS_MESSAGE_TYPE_ID}; diff --git a/src/transport/message_handler.rs b/src/transport/message_handler.rs index 06f0027..b80c826 100644 --- a/src/transport/message_handler.rs +++ b/src/transport/message_handler.rs @@ -5,8 +5,8 @@ use crate::transport::msgs::{LSPSMessage, RawLSPSMessage, LSPS_MESSAGE_TYPE_ID}; use crate::transport::protocol::LSPS0MessageHandler; use lightning::chain::chaininterface::{BroadcasterInterface, FeeEstimator}; -use lightning::ln::channelmanager::InterceptId; use lightning::chain::{self, BestBlock, Confirm, Filter, Listen}; +use lightning::ln::channelmanager::InterceptId; use lightning::ln::channelmanager::{ChainParameters, ChannelManager}; use lightning::ln::features::{InitFeatures, NodeFeatures}; use lightning::ln::msgs::{ @@ -14,6 +14,7 @@ use lightning::ln::msgs::{ }; use lightning::ln::peer_handler::{CustomMessageHandler, PeerManager, SocketDescriptor}; use lightning::ln::wire::CustomMessageReader; +use lightning::ln::ChannelId; use lightning::routing::router::Router; use lightning::sign::{EntropySource, NodeSigner, SignerProvider}; use lightning::util::errors::APIError; @@ -216,7 +217,7 @@ where { /// /// Without this the messages will be sent based on whatever polling interval /// your background processor uses. - pub fn set_peer_manager ( + pub fn set_peer_manager( &self, peer_manager: Arc>, ) { if let Some(lsps2_message_handler) = &self.lsps2_message_handler { @@ -355,7 +356,7 @@ where { /// Will forward the intercepted HTLC if it matches a channel /// we need to forward a payment over otherwise it will be ignored. pub fn channel_ready( - &self, user_channel_id: u128, channel_id: &[u8; 32], counterparty_node_id: &PublicKey, + &self, user_channel_id: u128, channel_id: &ChannelId, counterparty_node_id: &PublicKey, ) -> Result<(), APIError> { if let Some(lsps2_message_handler) = &self.lsps2_message_handler { lsps2_message_handler.channel_ready( @@ -411,7 +412,8 @@ impl< CMH: Deref, NS: Deref, C: Deref, - > CustomMessageReader for LiquidityManager + > CustomMessageReader + for LiquidityManager where ES::Target: EntropySource, L::Target: Logger, @@ -454,7 +456,8 @@ impl< CMH: Deref, NS: Deref, C: Deref, - > CustomMessageHandler for LiquidityManager + > CustomMessageHandler + for LiquidityManager where ES::Target: EntropySource, M::Target: chain::Watch<::Signer>, diff --git a/src/transport/msgs.rs b/src/transport/msgs.rs index c945c11..ec7a12c 100644 --- a/src/transport/msgs.rs +++ b/src/transport/msgs.rs @@ -22,7 +22,7 @@ const JSONRPC_INVALID_MESSAGE_ERROR_CODE: i32 = -32700; const JSONRPC_INVALID_MESSAGE_ERROR_MESSAGE: &str = "parse error"; const LSPS0_LISTPROTOCOLS_METHOD_NAME: &str = "lsps0.list_protocols"; -/// The lightning message type id for lsps messages +/// The Lightning message type id for LSPS messages pub const LSPS_MESSAGE_TYPE_ID: u16 = 37913; /// Lightning message type used by LSPS protocols @@ -277,7 +277,7 @@ impl<'de, 'a> Visitor<'de> for LSPSMessageVisitor<'a> { LSPS0Request::ListProtocols(ListProtocolsRequest {}), ))) } - jit_channel::msgs::LSPS2_GETVERSIONS_METHOD_NAME => { + jit_channel::msgs::LSPS2_GET_VERSIONS_METHOD_NAME => { let request = serde_json::from_value(params.unwrap_or(json!({}))) .map_err(de::Error::custom)?; Ok(LSPSMessage::LSPS2(jit_channel::msgs::Message::Request( @@ -285,7 +285,7 @@ impl<'de, 'a> Visitor<'de> for LSPSMessageVisitor<'a> { jit_channel::msgs::Request::GetVersions(request), ))) } - jit_channel::msgs::LSPS2_GETINFO_METHOD_NAME => { + jit_channel::msgs::LSPS2_GET_INFO_METHOD_NAME => { let request = serde_json::from_value(params.unwrap_or(json!({}))) .map_err(de::Error::custom)?; Ok(LSPSMessage::LSPS2(jit_channel::msgs::Message::Request( @@ -325,7 +325,7 @@ impl<'de, 'a> Visitor<'de> for LSPSMessageVisitor<'a> { Err(de::Error::custom("Received invalid JSON-RPC object: one of method, result, or error required")) } } - jit_channel::msgs::LSPS2_GETVERSIONS_METHOD_NAME => { + jit_channel::msgs::LSPS2_GET_VERSIONS_METHOD_NAME => { if let Some(result) = result { let response = serde_json::from_value(result).map_err(de::Error::custom)?; @@ -337,7 +337,7 @@ impl<'de, 'a> Visitor<'de> for LSPSMessageVisitor<'a> { Err(de::Error::custom("Received invalid lsps2.getversions response.")) } } - jit_channel::msgs::LSPS2_GETINFO_METHOD_NAME => { + jit_channel::msgs::LSPS2_GET_INFO_METHOD_NAME => { if let Some(error) = error { Ok(LSPSMessage::LSPS2(jit_channel::msgs::Message::Response( RequestId(id), From c1b8114a172d61ff57881082f011a4e405785ebe Mon Sep 17 00:00:00 2001 From: John Cantrell Date: Wed, 4 Oct 2023 21:46:07 -0400 Subject: [PATCH 4/4] wip --- src/jit_channel/channel_manager.rs | 24 ++++++++++++++++++------ src/jit_channel/msgs.rs | 1 + 2 files changed, 19 insertions(+), 6 deletions(-) diff --git a/src/jit_channel/channel_manager.rs b/src/jit_channel/channel_manager.rs index a4d397c..2e58c8f 100644 --- a/src/jit_channel/channel_manager.rs +++ b/src/jit_channel/channel_manager.rs @@ -61,7 +61,7 @@ enum InboundJITChannelState { MenuRequested, PendingMenuSelection, BuyRequested, - PendingPayment, + PendingPayment { client_trusts_lsp: bool, scid: String }, } impl InboundJITChannelState { @@ -106,9 +106,9 @@ impl InboundJITChannelState { } } - fn invoice_params_received(&self) -> Result { + fn invoice_params_received(&self, client_trusts_lsp: bool, scid: String) -> Result { match self { - InboundJITChannelState::BuyRequested => Ok(InboundJITChannelState::PendingPayment), + InboundJITChannelState::BuyRequested => Ok(InboundJITChannelState::PendingPayment { client_trusts_lsp, scid }), state => Err(ChannelStateError(format!( "Invoice params received when JIT Channel was in state: {:?}", state @@ -149,8 +149,8 @@ impl InboundJITChannel { Ok(()) } - pub fn invoice_params_received(&mut self) -> Result<(), LightningError> { - self.state = self.state.invoice_params_received()?; + pub fn invoice_params_received(&mut self, client_trusts_lsp: bool, jit_channel_scid: String) -> Result<(), LightningError> { + self.state = self.state.invoice_params_received(client_trusts_lsp, jit_channel_scid)?; Ok(()) } } @@ -602,6 +602,13 @@ where Some(inner_state_lock) => { let mut peer_state = inner_state_lock.lock().unwrap(); if let Some(jit_channel) = peer_state.outbound_channels_by_scid.get_mut(&scid) { + + // TODO: Need to support MPP payments. If payment_amount_msat is known, needs to queue intercepted HTLCs in a map by payment_hash + // LiquidityManager will need to be regularly polled so it can continually check if the payment amount has been received + // and can release the payment or if the channel valid_until has expired and should be failed. + // Can perform check each time HTLC is received and on interval? I guess interval only needs to check expiration as + // we can only reach threshold when htlc is intercepted. + match jit_channel .htlc_intercepted(expected_outbound_amount_msat, intercept_id) { @@ -910,6 +917,11 @@ where fn handle_buy_request( &self, request_id: RequestId, counterparty_node_id: &PublicKey, params: BuyRequest, ) -> Result<(), LightningError> { + // TODO: need to perform check on `params.version`. + // TODO: if payment_size_msat is specified, make sure opening_fee is >= payment_size_msat. + // TODO: if payment_size_msat is specified, make sure opening_fee does not hit overflow error. + // TODO: if payment_size_msat is specified, make sure our node has sufficient incoming liquidity from public network to receive it. + if params.opening_fee_params.is_valid(&self.promise_secret) { let mut outer_state_lock = self.per_peer_state.write().unwrap(); let inner_state_lock = outer_state_lock @@ -957,7 +969,7 @@ where action: ErrorAction::IgnoreAndLog(Level::Info), })?; - if let Err(e) = jit_channel.invoice_params_received() { + if let Err(e) = jit_channel.invoice_params_received(result.client_trusts_lsp, result.jit_channel_scid.clone()) { peer_state.remove_inbound_channel(jit_channel_id); return Err(e); } diff --git a/src/jit_channel/msgs.rs b/src/jit_channel/msgs.rs index 167eef0..98ed55a 100644 --- a/src/jit_channel/msgs.rs +++ b/src/jit_channel/msgs.rs @@ -85,6 +85,7 @@ pub struct OpeningFeeParams { impl OpeningFeeParams { /// Determine that these parameters are valid given the secret used to generate the promise. + // TODO: add validation check that valid_until >= now() pub fn is_valid(&self, promise_secret: &[u8; 32]) -> bool { let mut hmac = HmacEngine::::new(promise_secret); hmac.input(&self.min_fee_msat.to_be_bytes());