diff --git a/dash-spv/src/client/sync_coordinator.rs b/dash-spv/src/client/sync_coordinator.rs index 1914c99e6..4e2c50287 100644 --- a/dash-spv/src/client/sync_coordinator.rs +++ b/dash-spv/src/client/sync_coordinator.rs @@ -31,24 +31,16 @@ impl /// continuous network monitoring, and calls `stop()` before returning. pub async fn run(&self, token: CancellationToken) -> Result<()> { let handler = self.event_handler.clone(); - - if let Err(e) = self.start().await { - handler.on_error(&e.to_string()); - return Err(e); - } - - tracing::info!("Starting continuous network monitoring..."); - let monitor_shutdown = CancellationToken::new(); let (monitor_failure_tx, mut monitor_failure_rx) = mpsc::channel::(1); - // Subscribe to channels + // Subscribe and spawn monitors before startup so we don't miss early + // connection events. let sync_event_rx = self.subscribe_sync_events().await; let network_event_rx = self.subscribe_network_events().await; let progress_rx = self.subscribe_progress().await; let wallet_event_rx = self.wallet.read().await.subscribe_events(); - // Spawn monitoring tasks let sync_task = spawn_broadcast_monitor( "Sync event", sync_event_rx, @@ -83,6 +75,15 @@ impl monitor_failure_tx, ); + if let Err(e) = self.start().await { + monitor_shutdown.cancel(); + let _ = tokio::join!(sync_task, network_task, wallet_task, progress_task); + handler.on_error(&e.to_string()); + return Err(e); + } + + tracing::info!("Starting continuous network monitoring..."); + // Run the sync loop let mut sync_coordinator_tick_interval = tokio::time::interval(SYNC_COORDINATOR_TICK_MS);