Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 11 additions & 10 deletions dash-spv/src/client/sync_coordinator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,24 +31,16 @@ impl<W: WalletInterface, N: NetworkManager, S: StorageManager, H: EventHandler>
/// continuous network monitoring, and calls `stop()` before returning.
pub async fn run(&self, token: CancellationToken) -> Result<()> {
let handler = self.event_handler.clone();

if let Err(e) = self.start().await {
handler.on_error(&e.to_string());
return Err(e);
}

tracing::info!("Starting continuous network monitoring...");

let monitor_shutdown = CancellationToken::new();
let (monitor_failure_tx, mut monitor_failure_rx) = mpsc::channel::<String>(1);

// Subscribe to channels
// Subscribe and spawn monitors before startup so we don't miss early
// connection events.
let sync_event_rx = self.subscribe_sync_events().await;
let network_event_rx = self.subscribe_network_events().await;
let progress_rx = self.subscribe_progress().await;
let wallet_event_rx = self.wallet.read().await.subscribe_events();

// Spawn monitoring tasks
let sync_task = spawn_broadcast_monitor(
"Sync event",
sync_event_rx,
Expand Down Expand Up @@ -83,6 +75,15 @@ impl<W: WalletInterface, N: NetworkManager, S: StorageManager, H: EventHandler>
monitor_failure_tx,
);

if let Err(e) = self.start().await {
monitor_shutdown.cancel();
let _ = tokio::join!(sync_task, network_task, wallet_task, progress_task);
handler.on_error(&e.to_string());
return Err(e);
}

tracing::info!("Starting continuous network monitoring...");

// Run the sync loop
let mut sync_coordinator_tick_interval = tokio::time::interval(SYNC_COORDINATOR_TICK_MS);

Expand Down
Loading