diff --git a/src/events.rs b/src/events.rs new file mode 100644 index 0000000..e69366f --- /dev/null +++ b/src/events.rs @@ -0,0 +1,58 @@ +// This file is Copyright its original authors, visible in version control +// history. +// +// This file is licensed under the Apache License, Version 2.0 or the MIT license +// , at your option. +// You may not use this file except in accordance with one or both of these +// licenses. + +//! Events are surfaced by the library to indicate some action must be taken +//! by the end-user. +//! +//! Because we don't have a built-in runtime, it's up to the end-user to poll +//! [`crate::LiquidityManager::get_and_clear_pending_events()`] to receive events. + +use std::collections::VecDeque; +use std::sync::{Condvar, Mutex}; + +#[derive(Default)] +pub(crate) struct EventQueue { + queue: Mutex>, + condvar: Condvar, +} + +impl EventQueue { + pub fn enqueue(&self, event: Event) { + { + let mut queue = self.queue.lock().unwrap(); + queue.push_back(event); + } + + self.condvar.notify_one(); + } + + pub fn wait_next_event(&self) -> Event { + let mut queue = + self.condvar.wait_while(self.queue.lock().unwrap(), |queue| queue.is_empty()).unwrap(); + + let event = queue.pop_front().expect("non empty queue"); + let should_notify = !queue.is_empty(); + + drop(queue); + + if should_notify { + self.condvar.notify_one(); + } + + event + } + + pub fn get_and_clear_pending_events(&self) -> Vec { + self.queue.lock().unwrap().drain(..).collect() + } +} + +/// Event which you should probably take some action in response to. +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum Event {} diff --git a/src/lib.rs b/src/lib.rs index 028f820..4919017 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -19,6 +19,7 @@ #![cfg_attr(docsrs, feature(doc_auto_cfg))] mod channel_request; +pub mod events; mod jit_channel; mod transport; mod utils; diff --git a/src/transport/message_handler.rs b/src/transport/message_handler.rs index 6f58583..20681fe 100644 --- a/src/transport/message_handler.rs +++ b/src/transport/message_handler.rs @@ -1,3 +1,4 @@ +use crate::events::{Event, EventQueue}; use crate::transport::msgs::{LSPSMessage, RawLSPSMessage, LSPS_MESSAGE_TYPE}; use crate::transport::protocol::LSPS0MessageHandler; @@ -45,6 +46,7 @@ where ES::Target: EntropySource, { pending_messages: Arc>>, + pending_events: Arc, request_id_to_method_map: Mutex>, lsps0_message_handler: LSPS0MessageHandler, provider_config: Option, @@ -65,12 +67,27 @@ where Self { pending_messages, + pending_events: Arc::new(EventQueue::default()), request_id_to_method_map: Mutex::new(HashMap::new()), lsps0_message_handler, provider_config, } } + /// Blocks until next event is ready and returns it + /// + /// Typically you would spawn a thread or task that calls this in a loop + pub fn wait_next_event(&self) -> Event { + self.pending_events.wait_next_event() + } + + /// Returns and clears all events without blocking + /// + /// Typically you would spawn a thread or task that calls this in a loop + pub fn get_and_clear_pending_events(&self) -> Vec { + self.pending_events.get_and_clear_pending_events() + } + fn handle_lsps_message( &self, msg: LSPSMessage, sender_node_id: &PublicKey, ) -> Result<(), lightning::ln::msgs::LightningError> {