From e2af65e085a648e94e64f118d264f56b80930d46 Mon Sep 17 00:00:00 2001 From: Park Juhyung Date: Wed, 8 Apr 2020 15:01:45 +0900 Subject: [PATCH 1/2] Disconnect a socket If Foundry meets EOF while reading the socket --- network/src/p2p/handler.rs | 33 +++++++++++++++++++++++++++++---- network/src/stream.rs | 8 +++++++- 2 files changed, 36 insertions(+), 5 deletions(-) diff --git a/network/src/p2p/handler.rs b/network/src/p2p/handler.rs index bff1c03f68..511bacdb10 100644 --- a/network/src/p2p/handler.rs +++ b/network/src/p2p/handler.rs @@ -20,6 +20,7 @@ use super::connection::{ use super::listener::Listener; use super::{NegotiationMessage, NetworkMessage}; use crate::client::Client; +use crate::p2p::connection::Error as P2PConnectionError; use crate::session::Session; use crate::stream::Stream; use crate::{FiltersControl, NodeId, RoutingTable, SocketAddr}; @@ -670,7 +671,13 @@ impl IoHandler for Handler { io.update_registration(stream_token); } }); - match con.receive()? { + let received = con.receive(); + if let Err(P2PConnectionError::IoError(ioerr)) = &received { + if ioerr.kind() == std::io::ErrorKind::ConnectionAborted { + io.deregister_stream(stream_token); + } + }; + match received? { Some(NetworkMessage::Extension(msg)) => { let remote_node_id = *self.remote_node_ids.read().get(&stream_token).unwrap_or_else(|| { unreachable!("Node id for {}:{} must exist", stream_token, con.peer_addr()) @@ -739,7 +746,13 @@ impl IoHandler for Handler { io.update_registration(stream_token); } }); - match con.receive()? { + let received = con.receive(); + if let Err(P2PConnectionError::IoError(ioerr)) = &received { + if ioerr.kind() == std::io::ErrorKind::ConnectionAborted { + io.deregister_stream(stream_token); + } + }; + match received? { Some(NetworkMessage::Extension(msg)) => { let remote_node_id = *self.remote_node_ids.read().get(&stream_token).unwrap_or_else(|| { unreachable!("Node id for {}:{} must exist", stream_token, con.peer_addr()) @@ -784,7 +797,13 @@ impl IoHandler for Handler { io.update_registration(stream_token); } }); - match con.receive()? { + let received = con.receive(); + if let Err(P2PConnectionError::IoError(ioerr)) = &received { + if ioerr.kind() == std::io::ErrorKind::ConnectionAborted { + io.deregister_stream(stream_token); + } + }; + match received? { Some(OutgoingMessage::Sync1 { initiator_pub_key, network_id, @@ -879,7 +898,13 @@ impl IoHandler for Handler { } }); let from = *con.peer_addr(); - match con.receive()? { + let received = con.receive(); + if let Err(P2PConnectionError::IoError(ioerr)) = &received { + if ioerr.kind() == std::io::ErrorKind::ConnectionAborted { + io.deregister_stream(stream_token); + } + }; + match received? { Some(IncomingMessage::Ack { recipient_pub_key, encrypted_nonce, diff --git a/network/src/stream.rs b/network/src/stream.rs index e96b280ee6..8043d589e1 100644 --- a/network/src/stream.rs +++ b/network/src/stream.rs @@ -109,6 +109,9 @@ impl TryStream { assert!(read_size < len_of_len, "{} should be less than {}", read_size, len_of_len); if let Some(new_read_size) = self.stream.try_read(&mut bytes[(1 + read_size)..=len_of_len])? { + if new_read_size == 0 { + return Err(io::Error::new(io::ErrorKind::ConnectionAborted, "EOF")) + } read_size += new_read_size; }; if len_of_len == read_size { @@ -128,7 +131,7 @@ impl TryStream { if let Some(read_size) = self.stream.try_read(&mut bytes)? { if read_size == 0 { - return Ok(None) + return Err(io::Error::new(io::ErrorKind::ConnectionAborted, "EOF").into()) } debug_assert_eq!(1, read_size); if 0xf8 <= bytes[0] { @@ -184,6 +187,9 @@ impl TryStream { while remain_length != 0 { let to_be_read = ::std::cmp::min(remain_length, 1024); if let Some(read_size) = self.stream.try_read(&mut bytes[0..to_be_read])? { + if read_size == 0 { + return Err(io::Error::new(io::ErrorKind::ConnectionAborted, "EOF").into()) + } result.extend_from_slice(&bytes[..read_size]); debug_assert!(remain_length >= read_size); remain_length -= read_size; From 2cc93fbc07f3a181a4369ff4995f0205ea1494a2 Mon Sep 17 00:00:00 2001 From: Park Juhyung Date: Wed, 8 Apr 2020 18:07:10 +0900 Subject: [PATCH 2/2] Disconnect a connection if the send call returns BrokenPipe error --- network/src/p2p/handler.rs | 34 +++++++++++++++++++++++++++++----- 1 file changed, 29 insertions(+), 5 deletions(-) diff --git a/network/src/p2p/handler.rs b/network/src/p2p/handler.rs index 511bacdb10..6a69cb5dce 100644 --- a/network/src/p2p/handler.rs +++ b/network/src/p2p/handler.rs @@ -941,32 +941,56 @@ impl IoHandler for Handler { Ok(()) } - fn stream_writable(&self, _io: &IoContext, stream: StreamToken) -> IoHandlerResult<()> { + fn stream_writable(&self, io: &IoContext, stream: StreamToken) -> IoHandlerResult<()> { match stream { FIRST_INBOUND..=LAST_INBOUND => { if let Some(con) = self.inbound_connections.write().get_mut(&stream) { - con.flush()?; + let flush_result = con.flush(); + if let Err(P2PConnectionError::IoError(io_error)) = &flush_result { + if io_error.kind() == std::io::ErrorKind::BrokenPipe { + io.deregister_stream(stream); + } + } + flush_result?; } else { cdebug!(NETWORK, "Invalid inbound token({}) on write", stream); } } FIRST_OUTBOUND..=LAST_OUTBOUND => { if let Some(con) = self.outbound_connections.write().get_mut(&stream) { - con.flush()?; + let flush_result = con.flush(); + if let Err(P2PConnectionError::IoError(io_error)) = &flush_result { + if io_error.kind() == std::io::ErrorKind::BrokenPipe { + io.deregister_stream(stream); + } + } + flush_result?; } else { cdebug!(NETWORK, "Invalid outbound token({}) on write", stream); } } FIRST_INCOMING..=LAST_INCOMING => { if let Some(con) = self.incoming_connections.write().get_mut(&stream) { - con.flush()?; + let flush_result = con.flush(); + if let Err(P2PConnectionError::IoError(io_error)) = &flush_result { + if io_error.kind() == std::io::ErrorKind::BrokenPipe { + io.deregister_stream(stream); + } + } + flush_result?; } else { cdebug!(NETWORK, "Invalid incoming token({}) on write", stream); } } FIRST_OUTGOING..=LAST_OUTGOING => { if let Some(con) = self.outgoing_connections.write().get_mut(&stream) { - con.flush()?; + let flush_result = con.flush(); + if let Err(P2PConnectionError::IoError(io_error)) = &flush_result { + if io_error.kind() == std::io::ErrorKind::BrokenPipe { + io.deregister_stream(stream); + } + } + flush_result?; } else { cdebug!(NETWORK, "Invalid outgoing token({}) on write", stream); }