diff --git a/dash-spv/src/network/manager.rs b/dash-spv/src/network/manager.rs index bad467e87..9984f9416 100644 --- a/dash-spv/src/network/manager.rs +++ b/dash-spv/src/network/manager.rs @@ -304,6 +304,7 @@ impl PeerNetworkManager { } Err(e) => { log::warn!("Handshake failed with {}: {}", addr, e); + // Only clears connecting set. Peer was never added, so no count/event needed. pool.remove_peer(&addr).await; // Update reputation for handshake failure reputation_manager @@ -320,6 +321,7 @@ impl PeerNetworkManager { } Err(e) => { log::debug!("Failed to connect to {}: {}", addr, e); + // Only clears connecting set. Peer was never added, so no count/event needed. pool.remove_peer(&addr).await; // Minor reputation penalty for connection failure reputation_manager @@ -334,6 +336,45 @@ impl PeerNetworkManager { }); } + /// Decrement the connected count and emit PeerDisconnected / PeersUpdated events. + async fn notify_peer_removed( + pool: &PeerPool, + addr: &SocketAddr, + connected_peer_count: &AtomicUsize, + network_event_sender: &broadcast::Sender, + ) { + let sub_result = + connected_peer_count + .fetch_update(Ordering::Relaxed, Ordering::Relaxed, |c| c.checked_sub(1)); + if sub_result.is_err() { + log::warn!("Peer count already zero when removing {}", addr); + } + let count = connected_peer_count.load(Ordering::Relaxed); + let addresses = pool.get_connected_addresses().await; + let best_height = pool.get_best_height().await; + let _ = network_event_sender.send(NetworkEvent::PeerDisconnected { + address: *addr, + }); + let _ = network_event_sender.send(NetworkEvent::PeersUpdated { + connected_count: count, + addresses, + best_height, + }); + } + + /// Remove a peer from the pool, decrement the connected count, and emit + /// PeerDisconnected / PeersUpdated events. + async fn remove_peer_and_notify( + pool: &PeerPool, + addr: &SocketAddr, + connected_peer_count: &AtomicUsize, + network_event_sender: &broadcast::Sender, + ) { + if pool.remove_peer(addr).await.is_some() { + Self::notify_peer_removed(pool, addr, connected_peer_count, network_event_sender).await; + } + } + /// Start reading messages from a peer #[allow(clippy::too_many_arguments)] // TODO: refactor to reduce arguments async fn start_peer_reader( @@ -664,26 +705,15 @@ impl PeerNetworkManager { } } - // Remove from pool + // Remove from pool and notify consumers log::warn!("Disconnecting from {} (peer reader loop ended)", addr); - let removed = pool.remove_peer(&addr).await; - if removed.is_some() { - // Decrement connected peer counter when a peer is removed - connected_peer_count.fetch_sub(1, Ordering::Relaxed); - - // Emit peer disconnected event - let count = connected_peer_count.load(Ordering::Relaxed); - let addresses = pool.get_connected_addresses().await; - let best_height = pool.get_best_height().await; - let _ = network_event_sender.send(NetworkEvent::PeerDisconnected { - address: addr, - }); - let _ = network_event_sender.send(NetworkEvent::PeersUpdated { - connected_count: count, - addresses, - best_height, - }); - } + Self::remove_peer_and_notify( + &pool, + &addr, + &connected_peer_count, + &network_event_sender, + ) + .await; headers2_disabled.lock().await.remove(&addr); @@ -762,8 +792,19 @@ impl PeerNetworkManager { } async fn maintenance_tick(&self) { - // Clean up disconnected peers - self.pool.cleanup_disconnected().await; + // Remove peers that the reader loop failed to clean up. + // This should not trigger under normal operation. + let unhealthy = self.pool.remove_unhealthy().await; + for addr in &unhealthy { + log::warn!("Maintenance removed stale peer {} - reader loop missed cleanup", addr); + Self::notify_peer_removed( + &self.pool, + addr, + &self.connected_peer_count, + &self.network_event_sender, + ) + .await; + } let count = self.pool.peer_count().await; log::debug!("Connected peers: {}", count); @@ -1182,8 +1223,13 @@ impl PeerNetworkManager { pub async fn disconnect_peer(&self, addr: &SocketAddr, reason: &str) -> Result<(), Error> { log::info!("Disconnecting peer {} - reason: {}", addr, reason); - // Remove the peer - self.pool.remove_peer(addr).await; + Self::remove_peer_and_notify( + &self.pool, + addr, + &self.connected_peer_count, + &self.network_event_sender, + ) + .await; Ok(()) } diff --git a/dash-spv/src/network/pool.rs b/dash-spv/src/network/pool.rs index 5ccb1ba11..657d620f7 100644 --- a/dash-spv/src/network/pool.rs +++ b/dash-spv/src/network/pool.rs @@ -159,8 +159,9 @@ impl PeerPool { self.peer_count().await < TARGET_PEERS } - /// Clean up disconnected peers - pub async fn cleanup_disconnected(&self) { + /// Remove unhealthy peers and return their addresses so the caller can + /// emit the appropriate network events. + pub async fn remove_unhealthy(&self) -> Vec { let peers = self.peers.read().await; let mut unhealthy = Vec::new(); @@ -179,14 +180,10 @@ impl PeerPool { // Remove unhealthy connections if !unhealthy.is_empty() { let mut peers = self.peers.write().await; - for addr in unhealthy { - peers.remove(&addr); - log::warn!( - "Cleaned up unhealthy peer: {} (marked unhealthy by health check)", - addr - ); - } + unhealthy.retain(|addr| peers.remove(addr).is_some()); } + + unhealthy } }