Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
92 changes: 69 additions & 23 deletions dash-spv/src/network/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,7 @@ impl PeerNetworkManager {
}
Err(e) => {
log::warn!("Handshake failed with {}: {}", addr, e);
// Only clears connecting set. Peer was never added, so no count/event needed.
pool.remove_peer(&addr).await;
// Update reputation for handshake failure
reputation_manager
Expand All @@ -320,6 +321,7 @@ impl PeerNetworkManager {
}
Err(e) => {
log::debug!("Failed to connect to {}: {}", addr, e);
// Only clears connecting set. Peer was never added, so no count/event needed.
pool.remove_peer(&addr).await;
// Minor reputation penalty for connection failure
reputation_manager
Expand All @@ -334,6 +336,45 @@ impl PeerNetworkManager {
});
}

/// Decrement the connected count and emit PeerDisconnected / PeersUpdated events.
async fn notify_peer_removed(
pool: &PeerPool,
addr: &SocketAddr,
connected_peer_count: &AtomicUsize,
network_event_sender: &broadcast::Sender<NetworkEvent>,
) {
let sub_result =
connected_peer_count
.fetch_update(Ordering::Relaxed, Ordering::Relaxed, |c| c.checked_sub(1));
if sub_result.is_err() {
log::warn!("Peer count already zero when removing {}", addr);
}
let count = connected_peer_count.load(Ordering::Relaxed);
let addresses = pool.get_connected_addresses().await;
let best_height = pool.get_best_height().await;
let _ = network_event_sender.send(NetworkEvent::PeerDisconnected {
address: *addr,
});
let _ = network_event_sender.send(NetworkEvent::PeersUpdated {
connected_count: count,
addresses,
best_height,
});
}
Comment thread
coderabbitai[bot] marked this conversation as resolved.

/// Remove a peer from the pool, decrement the connected count, and emit
/// PeerDisconnected / PeersUpdated events.
async fn remove_peer_and_notify(
pool: &PeerPool,
addr: &SocketAddr,
connected_peer_count: &AtomicUsize,
network_event_sender: &broadcast::Sender<NetworkEvent>,
) {
if pool.remove_peer(addr).await.is_some() {
Self::notify_peer_removed(pool, addr, connected_peer_count, network_event_sender).await;
}
}

/// Start reading messages from a peer
#[allow(clippy::too_many_arguments)] // TODO: refactor to reduce arguments
async fn start_peer_reader(
Expand Down Expand Up @@ -664,26 +705,15 @@ impl PeerNetworkManager {
}
}

// Remove from pool
// Remove from pool and notify consumers
log::warn!("Disconnecting from {} (peer reader loop ended)", addr);
let removed = pool.remove_peer(&addr).await;
if removed.is_some() {
// Decrement connected peer counter when a peer is removed
connected_peer_count.fetch_sub(1, Ordering::Relaxed);

// Emit peer disconnected event
let count = connected_peer_count.load(Ordering::Relaxed);
let addresses = pool.get_connected_addresses().await;
let best_height = pool.get_best_height().await;
let _ = network_event_sender.send(NetworkEvent::PeerDisconnected {
address: addr,
});
let _ = network_event_sender.send(NetworkEvent::PeersUpdated {
connected_count: count,
addresses,
best_height,
});
}
Self::remove_peer_and_notify(
&pool,
&addr,
&connected_peer_count,
&network_event_sender,
)
.await;

headers2_disabled.lock().await.remove(&addr);

Expand Down Expand Up @@ -762,8 +792,19 @@ impl PeerNetworkManager {
}

async fn maintenance_tick(&self) {
// Clean up disconnected peers
self.pool.cleanup_disconnected().await;
// Remove peers that the reader loop failed to clean up.
// This should not trigger under normal operation.
let unhealthy = self.pool.remove_unhealthy().await;
for addr in &unhealthy {
log::warn!("Maintenance removed stale peer {} - reader loop missed cleanup", addr);
Self::notify_peer_removed(
&self.pool,
addr,
&self.connected_peer_count,
&self.network_event_sender,
)
.await;
}
Comment thread
coderabbitai[bot] marked this conversation as resolved.

let count = self.pool.peer_count().await;
log::debug!("Connected peers: {}", count);
Expand Down Expand Up @@ -1182,8 +1223,13 @@ impl PeerNetworkManager {
pub async fn disconnect_peer(&self, addr: &SocketAddr, reason: &str) -> Result<(), Error> {
log::info!("Disconnecting peer {} - reason: {}", addr, reason);

// Remove the peer
self.pool.remove_peer(addr).await;
Self::remove_peer_and_notify(
&self.pool,
addr,
&self.connected_peer_count,
&self.network_event_sender,
)
.await;

Ok(())
}
Expand Down
15 changes: 6 additions & 9 deletions dash-spv/src/network/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,8 +159,9 @@ impl PeerPool {
self.peer_count().await < TARGET_PEERS
}

/// Clean up disconnected peers
pub async fn cleanup_disconnected(&self) {
/// Remove unhealthy peers and return their addresses so the caller can
/// emit the appropriate network events.
pub async fn remove_unhealthy(&self) -> Vec<SocketAddr> {
let peers = self.peers.read().await;
let mut unhealthy = Vec::new();

Expand All @@ -179,14 +180,10 @@ impl PeerPool {
// Remove unhealthy connections
if !unhealthy.is_empty() {
let mut peers = self.peers.write().await;
for addr in unhealthy {
peers.remove(&addr);
log::warn!(
"Cleaned up unhealthy peer: {} (marked unhealthy by health check)",
addr
);
}
unhealthy.retain(|addr| peers.remove(addr).is_some());
}

unhealthy
}
}

Expand Down
Loading