From 94cc2dca86ea6b96ca12a921af36451175c0984e Mon Sep 17 00:00:00 2001 From: Joost Jager Date: Thu, 30 Jan 2025 15:04:33 +0100 Subject: [PATCH 1/3] remove unnecessary arc clone --- src/lib.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/src/lib.rs b/src/lib.rs index c235d2a88..b20101455 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -259,7 +259,6 @@ impl Node { return; } _ = interval.tick() => { - let gossip_sync_logger = Arc::clone(&gossip_sync_logger); let now = Instant::now(); match gossip_source.update_rgs_snapshot().await { Ok(updated_timestamp) => { From 49502cff8663b023d4f7ed5c0cf94e6330a24f2d Mon Sep 17 00:00:00 2001 From: Joost Jager Date: Thu, 30 Jan 2025 14:10:03 +0100 Subject: [PATCH 2/3] add functionality to periodically update routing scores from an external http source --- bindings/ldk_node.udl | 2 + src/builder.rs | 44 ++++++++++++++---- src/config.rs | 6 +++ src/lib.rs | 24 ++++++++++ src/scoring.rs | 104 ++++++++++++++++++++++++++++++++++++++++++ src/types.rs | 5 +- 6 files changed, 175 insertions(+), 10 deletions(-) create mode 100644 src/scoring.rs diff --git a/bindings/ldk_node.udl b/bindings/ldk_node.udl index bd1e4fc43..9da0d89b6 100644 --- a/bindings/ldk_node.udl +++ b/bindings/ldk_node.udl @@ -81,6 +81,7 @@ interface Builder { void set_chain_source_bitcoind_rest(string rest_host, u16 rest_port, string rpc_host, u16 rpc_port, string rpc_user, string rpc_password); void set_gossip_source_p2p(); void set_gossip_source_rgs(string rgs_server_url); + void set_pathfinding_scores_source(string url); void set_liquidity_source_lsps1(PublicKey node_id, SocketAddress address, string? token); void set_liquidity_source_lsps2(PublicKey node_id, SocketAddress address, string? token); void set_storage_dir_path(string storage_dir_path); @@ -330,6 +331,7 @@ dictionary NodeStatus { u64? latest_onchain_wallet_sync_timestamp; u64? latest_fee_rate_cache_update_timestamp; u64? latest_rgs_snapshot_timestamp; + u64? latest_pathfinding_scores_sync_timestamp; u64? latest_node_announcement_broadcast_timestamp; u32? latest_channel_monitor_archival_height; }; diff --git a/src/builder.rs b/src/builder.rs index b4a146e7c..af2d92463 100644 --- a/src/builder.rs +++ b/src/builder.rs @@ -27,7 +27,8 @@ use lightning::ln::peer_handler::{IgnoringMessageHandler, MessageHandler}; use lightning::routing::gossip::NodeAlias; use lightning::routing::router::DefaultRouter; use lightning::routing::scoring::{ - ProbabilisticScorer, ProbabilisticScoringDecayParameters, ProbabilisticScoringFeeParameters, + CombinedScorer, ProbabilisticScorer, ProbabilisticScoringDecayParameters, + ProbabilisticScoringFeeParameters, }; use lightning::sign::{EntropySource, NodeSigner}; use lightning::util::persist::{ @@ -110,6 +111,11 @@ enum GossipSourceConfig { RapidGossipSync(String), } +#[derive(Debug, Clone)] +struct PathfindingScoresSyncConfig { + url: String, +} + #[derive(Debug, Clone, Default)] struct LiquiditySourceConfig { // Act as an LSPS1 client connecting to the given service. @@ -243,6 +249,7 @@ pub struct NodeBuilder { log_writer_config: Option, async_payments_role: Option, runtime_handle: Option, + pathfinding_scores_sync_config: Option, } impl NodeBuilder { @@ -260,6 +267,7 @@ impl NodeBuilder { let liquidity_source_config = None; let log_writer_config = None; let runtime_handle = None; + let pathfinding_scores_sync_config = None; Self { config, entropy_source_config, @@ -269,6 +277,7 @@ impl NodeBuilder { log_writer_config, runtime_handle, async_payments_role: None, + pathfinding_scores_sync_config, } } @@ -411,6 +420,14 @@ impl NodeBuilder { self } + /// Configures the [`Node`] instance to source its external scores from the given URL. + /// + /// The external scores are merged into the local scoring system to improve routing. + pub fn set_pathfinding_scores_source(&mut self, url: String) -> &mut Self { + self.pathfinding_scores_sync_config = Some(PathfindingScoresSyncConfig { url }); + self + } + /// Configures the [`Node`] instance to source inbound liquidity from the given /// [bLIP-51 / LSPS1] service. /// @@ -718,6 +735,7 @@ impl NodeBuilder { self.chain_data_source_config.as_ref(), self.gossip_source_config.as_ref(), self.liquidity_source_config.as_ref(), + self.pathfinding_scores_sync_config.as_ref(), self.async_payments_role, seed_bytes, runtime, @@ -751,6 +769,7 @@ impl NodeBuilder { self.chain_data_source_config.as_ref(), self.gossip_source_config.as_ref(), self.liquidity_source_config.as_ref(), + self.pathfinding_scores_sync_config.as_ref(), self.async_payments_role, seed_bytes, runtime, @@ -910,6 +929,13 @@ impl ArcedNodeBuilder { self.inner.write().unwrap().set_gossip_source_rgs(rgs_server_url); } + /// Configures the [`Node`] instance to source its external scores from the given URL. + /// + /// The external scores are merged into the local scoring system to improve routing. + pub fn set_pathfinding_scores_source(&self, url: String) { + self.inner.write().unwrap().set_pathfinding_scores_source(url); + } + /// Configures the [`Node`] instance to source inbound liquidity from the given /// [bLIP-51 / LSPS1] service. /// @@ -1110,6 +1136,7 @@ fn build_with_store_internal( config: Arc, chain_data_source_config: Option<&ChainDataSourceConfig>, gossip_source_config: Option<&GossipSourceConfig>, liquidity_source_config: Option<&LiquiditySourceConfig>, + pathfinding_scores_sync_config: Option<&PathfindingScoresSyncConfig>, async_payments_role: Option, seed_bytes: [u8; 64], runtime: Arc, logger: Arc, kv_store: Arc, ) -> Result { @@ -1365,26 +1392,24 @@ fn build_with_store_internal( }, }; - let scorer = match io::utils::read_scorer( + let local_scorer = match io::utils::read_scorer( Arc::clone(&kv_store), Arc::clone(&network_graph), Arc::clone(&logger), ) { - Ok(scorer) => Arc::new(Mutex::new(scorer)), + Ok(scorer) => scorer, Err(e) => { if e.kind() == std::io::ErrorKind::NotFound { let params = ProbabilisticScoringDecayParameters::default(); - Arc::new(Mutex::new(ProbabilisticScorer::new( - params, - Arc::clone(&network_graph), - Arc::clone(&logger), - ))) + ProbabilisticScorer::new(params, Arc::clone(&network_graph), Arc::clone(&logger)) } else { return Err(BuildError::ReadFailed); } }, }; + let scorer = Arc::new(Mutex::new(CombinedScorer::new(local_scorer))); + let scoring_fee_params = ProbabilisticScoringFeeParameters::default(); let router = Arc::new(DefaultRouter::new( Arc::clone(&network_graph), @@ -1716,6 +1741,8 @@ fn build_with_store_internal( let (background_processor_stop_sender, _) = tokio::sync::watch::channel(()); let is_running = Arc::new(RwLock::new(false)); + let pathfinding_scores_sync_url = pathfinding_scores_sync_config.map(|c| c.url.clone()); + Ok(Node { runtime, stop_sender, @@ -1734,6 +1761,7 @@ fn build_with_store_internal( keys_manager, network_graph, gossip_source, + pathfinding_scores_sync_url, liquidity_source, kv_store, logger, diff --git a/src/config.rs b/src/config.rs index d221dd6c3..ce361c45a 100644 --- a/src/config.rs +++ b/src/config.rs @@ -63,6 +63,9 @@ pub(crate) const PEER_RECONNECTION_INTERVAL: Duration = Duration::from_secs(60); // The time in-between RGS sync attempts. pub(crate) const RGS_SYNC_INTERVAL: Duration = Duration::from_secs(60 * 60); +// The time in-between external scores sync attempts. +pub(crate) const EXTERNAL_PATHFINDING_SCORES_SYNC_INTERVAL: Duration = Duration::from_secs(60 * 60); + // The time in-between node announcement broadcast attempts. pub(crate) const NODE_ANN_BCAST_INTERVAL: Duration = Duration::from_secs(60 * 60); @@ -93,6 +96,9 @@ pub(crate) const RGS_SYNC_TIMEOUT_SECS: u64 = 5; /// The length in bytes of our wallets' keys seed. pub const WALLET_KEYS_SEED_LEN: usize = 64; +// The timeout after which we abort a external scores sync operation. +pub(crate) const EXTERNAL_PATHFINDING_SCORES_SYNC_TIMEOUT_SECS: u64 = 5; + #[derive(Debug, Clone)] /// Represents the configuration of an [`Node`] instance. /// diff --git a/src/lib.rs b/src/lib.rs index b20101455..253a584f9 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -95,6 +95,7 @@ mod message_handler; pub mod payment; mod peer_store; mod runtime; +mod scoring; mod tx_broadcaster; mod types; mod wallet; @@ -104,6 +105,7 @@ use std::net::ToSocketAddrs; use std::sync::{Arc, Mutex, RwLock}; use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH}; +use crate::scoring::setup_background_pathfinding_scores_sync; pub use balance::{BalanceDetails, LightningBalance, PendingSweepBalance}; use bitcoin::secp256k1::PublicKey; #[cfg(feature = "uniffi")] @@ -154,6 +156,7 @@ use types::{ pub use types::{ ChannelDetails, CustomTlvRecord, DynStore, PeerDetails, SyncAndAsyncKVStore, UserChannelId, }; + pub use { bip39, bitcoin, lightning, lightning_invoice, lightning_liquidity, lightning_types, tokio, vss_client, @@ -183,6 +186,7 @@ pub struct Node { keys_manager: Arc, network_graph: Arc, gossip_source: Arc, + pathfinding_scores_sync_url: Option, liquidity_source: Option>>>, kv_store: Arc, logger: Arc, @@ -290,6 +294,18 @@ impl Node { }); } + if let Some(pathfinding_scores_sync_url) = self.pathfinding_scores_sync_url.as_ref() { + setup_background_pathfinding_scores_sync( + pathfinding_scores_sync_url.clone(), + Arc::clone(&self.scorer), + Arc::clone(&self.node_metrics), + Arc::clone(&self.kv_store), + Arc::clone(&self.logger), + Arc::clone(&self.runtime), + self.stop_sender.subscribe(), + ); + } + if let Some(listening_addresses) = &self.config.listening_addresses { // Setup networking let peer_manager_connection_handler = Arc::clone(&self.peer_manager); @@ -691,6 +707,8 @@ impl Node { locked_node_metrics.latest_fee_rate_cache_update_timestamp; let latest_rgs_snapshot_timestamp = locked_node_metrics.latest_rgs_snapshot_timestamp.map(|val| val as u64); + let latest_pathfinding_scores_sync_timestamp = + locked_node_metrics.latest_pathfinding_scores_sync_timestamp; let latest_node_announcement_broadcast_timestamp = locked_node_metrics.latest_node_announcement_broadcast_timestamp; let latest_channel_monitor_archival_height = @@ -703,6 +721,7 @@ impl Node { latest_onchain_wallet_sync_timestamp, latest_fee_rate_cache_update_timestamp, latest_rgs_snapshot_timestamp, + latest_pathfinding_scores_sync_timestamp, latest_node_announcement_broadcast_timestamp, latest_channel_monitor_archival_height, } @@ -1530,6 +1549,8 @@ pub struct NodeStatus { /// /// Will be `None` if RGS isn't configured or the snapshot hasn't been updated yet. pub latest_rgs_snapshot_timestamp: Option, + /// The timestamp, in seconds since start of the UNIX epoch, when we last successfully merged external scores. + pub latest_pathfinding_scores_sync_timestamp: Option, /// The timestamp, in seconds since start of the UNIX epoch, when we last broadcasted a node /// announcement. /// @@ -1548,6 +1569,7 @@ pub(crate) struct NodeMetrics { latest_onchain_wallet_sync_timestamp: Option, latest_fee_rate_cache_update_timestamp: Option, latest_rgs_snapshot_timestamp: Option, + latest_pathfinding_scores_sync_timestamp: Option, latest_node_announcement_broadcast_timestamp: Option, latest_channel_monitor_archival_height: Option, } @@ -1559,6 +1581,7 @@ impl Default for NodeMetrics { latest_onchain_wallet_sync_timestamp: None, latest_fee_rate_cache_update_timestamp: None, latest_rgs_snapshot_timestamp: None, + latest_pathfinding_scores_sync_timestamp: None, latest_node_announcement_broadcast_timestamp: None, latest_channel_monitor_archival_height: None, } @@ -1567,6 +1590,7 @@ impl Default for NodeMetrics { impl_writeable_tlv_based!(NodeMetrics, { (0, latest_lightning_wallet_sync_timestamp, option), + (1, latest_pathfinding_scores_sync_timestamp, option), (2, latest_onchain_wallet_sync_timestamp, option), (4, latest_fee_rate_cache_update_timestamp, option), (6, latest_rgs_snapshot_timestamp, option), diff --git a/src/scoring.rs b/src/scoring.rs new file mode 100644 index 000000000..e244ab258 --- /dev/null +++ b/src/scoring.rs @@ -0,0 +1,104 @@ +use std::{ + io::Cursor, + sync::{Arc, Mutex, RwLock}, + time::{Duration, SystemTime}, +}; + +use crate::{ + config::{ + EXTERNAL_PATHFINDING_SCORES_SYNC_INTERVAL, EXTERNAL_PATHFINDING_SCORES_SYNC_TIMEOUT_SECS, + }, + logger::LdkLogger, + runtime::Runtime, + NodeMetrics, Scorer, +}; +use crate::{write_node_metrics, DynStore, Logger}; +use lightning::{ + log_error, log_info, log_trace, routing::scoring::ChannelLiquidities, util::ser::Readable, +}; + +/// Start a background task that periodically downloads scores via an external url and merges them into the local +/// pathfinding scores. +pub fn setup_background_pathfinding_scores_sync( + url: String, scorer: Arc>, node_metrics: Arc>, + kv_store: Arc, logger: Arc, runtime: Arc, + mut stop_receiver: tokio::sync::watch::Receiver<()>, +) { + log_info!(logger, "External scores background syncing enabled from {}", url); + + let logger = Arc::clone(&logger); + + runtime.spawn_background_processor_task(async move { + let mut interval = tokio::time::interval(EXTERNAL_PATHFINDING_SCORES_SYNC_INTERVAL); + loop { + tokio::select! { + _ = stop_receiver.changed() => { + log_trace!( + logger, + "Stopping background syncing external scores.", + ); + return; + } + _ = interval.tick() => { + log_trace!( + logger, + "Background sync of external scores started.", + ); + + sync_external_scores(logger.as_ref(), scorer.as_ref(), node_metrics.as_ref(), Arc::clone(&kv_store), &url).await; + } + } + } + }); +} + +async fn sync_external_scores( + logger: &Logger, scorer: &Mutex, node_metrics: &RwLock, + kv_store: Arc, url: &String, +) -> () { + let response = tokio::time::timeout( + Duration::from_secs(EXTERNAL_PATHFINDING_SCORES_SYNC_TIMEOUT_SECS), + reqwest::get(url), + ) + .await; + + let response = match response { + Ok(resp) => resp, + Err(e) => { + log_error!(logger, "Retrieving external scores timed out: {}", e); + return; + }, + }; + let response = match response { + Ok(resp) => resp, + Err(e) => { + log_error!(logger, "Failed to retrieve external scores update: {}", e); + return; + }, + }; + let body = match response.bytes().await { + Ok(bytes) => bytes, + Err(e) => { + log_error!(logger, "Failed to read external scores update: {}", e); + return; + }, + }; + let mut reader = Cursor::new(body); + match ChannelLiquidities::read(&mut reader) { + Ok(liquidities) => { + let duration_since_epoch = + SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap(); + scorer.lock().unwrap().merge(liquidities, duration_since_epoch); + let mut locked_node_metrics = node_metrics.write().unwrap(); + locked_node_metrics.latest_pathfinding_scores_sync_timestamp = + Some(duration_since_epoch.as_secs()); + write_node_metrics(&*locked_node_metrics, kv_store, logger).unwrap_or_else(|e| { + log_error!(logger, "Persisting node metrics failed: {}", e); + }); + log_trace!(logger, "External scores merged successfully"); + }, + Err(e) => { + log_error!(logger, "Failed to parse external scores update: {}", e); + }, + } +} diff --git a/src/types.rs b/src/types.rs index 2fc1c6488..800d9462d 100644 --- a/src/types.rs +++ b/src/types.rs @@ -17,7 +17,8 @@ use lightning::ln::peer_handler::IgnoringMessageHandler; use lightning::ln::types::ChannelId; use lightning::routing::gossip; use lightning::routing::router::DefaultRouter; -use lightning::routing::scoring::{ProbabilisticScorer, ProbabilisticScoringFeeParameters}; +use lightning::routing::scoring::CombinedScorer; +use lightning::routing::scoring::ProbabilisticScoringFeeParameters; use lightning::sign::InMemorySigner; use lightning::util::persist::{KVStore, KVStoreSync, MonitorUpdatingPersister}; use lightning::util::ser::{Readable, Writeable, Writer}; @@ -114,7 +115,7 @@ pub(crate) type Router = DefaultRouter< ProbabilisticScoringFeeParameters, Scorer, >; -pub(crate) type Scorer = ProbabilisticScorer, Arc>; +pub(crate) type Scorer = CombinedScorer, Arc>; pub(crate) type Graph = gossip::NetworkGraph>; From 52705d333ca67c36cc9f1473fdda0b23088495d5 Mon Sep 17 00:00:00 2001 From: Joost Jager Date: Mon, 10 Feb 2025 14:07:00 +0100 Subject: [PATCH 3/3] cache external pathfinding scores Save external pathfinding scores in a cache so that they will be available immediately after a node restart. Otherwise there might be a time window where new scores need to be downloaded still and the node operates on local data only. --- src/builder.rs | 19 ++++++++++++++++- src/io/utils.rs | 55 +++++++++++++++++++++++++++++++++++++++++++++++-- src/scoring.rs | 18 +++++++++++++--- 3 files changed, 86 insertions(+), 6 deletions(-) diff --git a/src/builder.rs b/src/builder.rs index af2d92463..f3a57e085 100644 --- a/src/builder.rs +++ b/src/builder.rs @@ -24,6 +24,7 @@ use lightning::io::Cursor; use lightning::ln::channelmanager::{self, ChainParameters, ChannelManagerReadArgs}; use lightning::ln::msgs::{RoutingMessageHandler, SocketAddress}; use lightning::ln::peer_handler::{IgnoringMessageHandler, MessageHandler}; +use lightning::log_trace; use lightning::routing::gossip::NodeAlias; use lightning::routing::router::DefaultRouter; use lightning::routing::scoring::{ @@ -51,7 +52,9 @@ use crate::event::EventQueue; use crate::fee_estimator::OnchainFeeEstimator; use crate::gossip::GossipSource; use crate::io::sqlite_store::SqliteStore; -use crate::io::utils::{read_node_metrics, write_node_metrics}; +use crate::io::utils::{ + read_external_pathfinding_scores_from_cache, read_node_metrics, write_node_metrics, +}; use crate::io::vss_store::VssStore; use crate::io::{ self, PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE, PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE, @@ -1410,6 +1413,20 @@ fn build_with_store_internal( let scorer = Arc::new(Mutex::new(CombinedScorer::new(local_scorer))); + // Restore external pathfinding scores from cache if possible. + match read_external_pathfinding_scores_from_cache(Arc::clone(&kv_store), Arc::clone(&logger)) { + Ok(external_scores) => { + scorer.lock().unwrap().merge(external_scores, cur_time); + log_trace!(logger, "External scores from cache merged successfully"); + }, + Err(e) => { + if e.kind() != std::io::ErrorKind::NotFound { + log_error!(logger, "Error while reading external scores from cache: {}", e); + return Err(BuildError::ReadFailed); + } + }, + } + let scoring_fee_params = ProbabilisticScoringFeeParameters::default(); let router = Arc::new(DefaultRouter::new( Arc::clone(&network_graph), diff --git a/src/io/utils.rs b/src/io/utils.rs index 1556314c4..6fb672e36 100644 --- a/src/io/utils.rs +++ b/src/io/utils.rs @@ -22,9 +22,11 @@ use bitcoin::Network; use lightning::io::Cursor; use lightning::ln::msgs::DecodeError; use lightning::routing::gossip::NetworkGraph; -use lightning::routing::scoring::{ProbabilisticScorer, ProbabilisticScoringDecayParameters}; +use lightning::routing::scoring::{ + ChannelLiquidities, ProbabilisticScorer, ProbabilisticScoringDecayParameters, +}; use lightning::util::persist::{ - KVStoreSync, KVSTORE_NAMESPACE_KEY_ALPHABET, KVSTORE_NAMESPACE_KEY_MAX_LEN, + KVStore, KVStoreSync, KVSTORE_NAMESPACE_KEY_ALPHABET, KVSTORE_NAMESPACE_KEY_MAX_LEN, NETWORK_GRAPH_PERSISTENCE_KEY, NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE, NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE, OUTPUT_SWEEPER_PERSISTENCE_KEY, OUTPUT_SWEEPER_PERSISTENCE_PRIMARY_NAMESPACE, OUTPUT_SWEEPER_PERSISTENCE_SECONDARY_NAMESPACE, @@ -48,6 +50,8 @@ use crate::types::{Broadcaster, DynStore, KeysManager, Sweeper}; use crate::wallet::ser::{ChangeSetDeserWrapper, ChangeSetSerWrapper}; use crate::{Error, EventQueue, NodeMetrics, PaymentDetails}; +pub const EXTERNAL_PATHFINDING_SCORES_CACHE_KEY: &str = "external_pathfinding_scores_cache"; + /// Generates a random [BIP 39] mnemonic. /// /// The result may be used to initialize the [`Node`] entropy, i.e., can be given to @@ -164,6 +168,53 @@ where }) } +/// Read previously persisted external pathfinding scores from the cache. +pub(crate) fn read_external_pathfinding_scores_from_cache( + kv_store: Arc, logger: L, +) -> Result +where + L::Target: LdkLogger, +{ + let mut reader = Cursor::new(KVStoreSync::read( + &*kv_store, + SCORER_PERSISTENCE_PRIMARY_NAMESPACE, + SCORER_PERSISTENCE_SECONDARY_NAMESPACE, + EXTERNAL_PATHFINDING_SCORES_CACHE_KEY, + )?); + ChannelLiquidities::read(&mut reader).map_err(|e| { + log_error!(logger, "Failed to deserialize scorer: {}", e); + std::io::Error::new(std::io::ErrorKind::InvalidData, "Failed to deserialize Scorer") + }) +} + +/// Persist external pathfinding scores to the cache. +pub(crate) async fn write_external_pathfinding_scores_to_cache( + kv_store: Arc, data: &ChannelLiquidities, logger: L, +) -> Result<(), Error> +where + L::Target: LdkLogger, +{ + KVStore::write( + &*kv_store, + SCORER_PERSISTENCE_PRIMARY_NAMESPACE, + SCORER_PERSISTENCE_SECONDARY_NAMESPACE, + EXTERNAL_PATHFINDING_SCORES_CACHE_KEY, + data.encode(), + ) + .await + .map_err(|e| { + log_error!( + logger, + "Writing data to key {}/{}/{} failed due to: {}", + NODE_METRICS_PRIMARY_NAMESPACE, + NODE_METRICS_SECONDARY_NAMESPACE, + EXTERNAL_PATHFINDING_SCORES_CACHE_KEY, + e + ); + Error::PersistenceFailed + }) +} + /// Read previously persisted events from the store. pub(crate) fn read_event_queue( kv_store: Arc, logger: L, diff --git a/src/scoring.rs b/src/scoring.rs index e244ab258..107f63f65 100644 --- a/src/scoring.rs +++ b/src/scoring.rs @@ -8,6 +8,7 @@ use crate::{ config::{ EXTERNAL_PATHFINDING_SCORES_SYNC_INTERVAL, EXTERNAL_PATHFINDING_SCORES_SYNC_TIMEOUT_SECS, }, + io::utils::write_external_pathfinding_scores_to_cache, logger::LdkLogger, runtime::Runtime, NodeMetrics, Scorer, @@ -86,15 +87,26 @@ async fn sync_external_scores( let mut reader = Cursor::new(body); match ChannelLiquidities::read(&mut reader) { Ok(liquidities) => { + if let Err(e) = write_external_pathfinding_scores_to_cache( + Arc::clone(&kv_store), + &liquidities, + logger, + ) + .await + { + log_error!(logger, "Failed to persist external scores to cache: {}", e); + } + let duration_since_epoch = SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap(); scorer.lock().unwrap().merge(liquidities, duration_since_epoch); let mut locked_node_metrics = node_metrics.write().unwrap(); locked_node_metrics.latest_pathfinding_scores_sync_timestamp = Some(duration_since_epoch.as_secs()); - write_node_metrics(&*locked_node_metrics, kv_store, logger).unwrap_or_else(|e| { - log_error!(logger, "Persisting node metrics failed: {}", e); - }); + write_node_metrics(&*locked_node_metrics, Arc::clone(&kv_store), logger) + .unwrap_or_else(|e| { + log_error!(logger, "Persisting node metrics failed: {}", e); + }); log_trace!(logger, "External scores merged successfully"); }, Err(e) => {