Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 35 additions & 6 deletions src/components/small_network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ use tokio::{
mpsc::{self, UnboundedReceiver, UnboundedSender},
oneshot,
},
task::JoinHandle,
};
use tokio_openssl::SslStream;
use tokio_serde::{formats::SymmetricalMessagePack, SymmetricallyFramed};
Expand Down Expand Up @@ -125,7 +126,10 @@ pub(crate) struct SmallNetwork<REv: 'static, P> {
// Note: This channel never sends anything, instead it is closed when `SmallNetwork` is dropped,
// signalling the receiver that it should cease operation. Don't listen to clippy!
#[allow(dead_code)]
shutdown: oneshot::Sender<()>,
shutdown: Option<oneshot::Sender<()>>,
/// Join handle for the server thread.
#[allow(dead_code)]
server_join_handle: Option<JoinHandle<()>>,
}

impl<REv, P> SmallNetwork<REv, P>
Expand Down Expand Up @@ -167,15 +171,16 @@ where
let our_fingerprint = our_endpoint.cert().public_key_fingerprint();

// Run the server task.
// We spawn it ourselves instead of through an effect to get a hold of the join handle,
// which we need to shutdown cleanly later on.
info!(%our_endpoint, "starting server background task");
let (server_shutdown_sender, server_shutdown_receiver) = oneshot::channel();
let mut effects = server_task(
let server_join_handle = tokio::spawn(server_task(
event_queue,
tokio::net::TcpListener::from_std(listener).map_err(Error::ListenerConversion)?,
server_shutdown_receiver,
)
.boxed()
.ignore();
));

let model = SmallNetwork {
cfg,
signed_endpoints: hashmap! { our_fingerprint => Signed::new(&our_endpoint, &private_key)? },
Expand All @@ -184,10 +189,12 @@ where
private_key: Arc::new(private_key),
event_queue,
outgoing: HashMap::new(),
shutdown: server_shutdown_sender,
shutdown: Some(server_shutdown_sender),
server_join_handle: Some(server_join_handle),
};

// Connect to the root node if we are not the root node.
let mut effects: Multiple<_> = Default::default();
if !we_are_root {
effects.extend(model.connect_to_root());
} else {
Expand All @@ -197,6 +204,28 @@ where
Ok((model, effects))
}

/// Close down the listening server socket.
///
/// Signals that the background process that runs the server should shutdown completely and
/// waits for it to complete the shutdown. This explicitly allows the background task to finish
/// and drop everything it owns, ensuring that resources such as allocated ports are free to be
/// reused once this completes.
#[allow(dead_code)]
async fn shutdown_server(&mut self) {
// Close the shutdown socket, causing the server to exit.
drop(self.shutdown.take());

// Wait for the server to exit cleanly.
if let Some(join_handle) = self.server_join_handle.take() {
match join_handle.await {
Ok(_) => debug!("server exited cleanly"),
Err(err) => error!(%err, "could not join server task cleanly"),
}
} else {
warn!("server shutdown while already shut down")
}
}

/// Attempts to connect to the root node.
fn connect_to_root(&self) -> Multiple<Effect<Event<P>>> {
connect_trusted(
Expand Down
13 changes: 5 additions & 8 deletions src/components/small_network/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,6 @@ use tracing::{debug, dispatcher::DefaultGuard, info};
/// Time interval for which to poll an observed testing network when no events have occured.
const POLL_INTERVAL: Duration = Duration::from_millis(10);

/// Amount of time to wait after shutting down all nodes to give the OS networking stack time to
/// catch up.
const NET_COOLDOWN: Duration = Duration::from_millis(100);

/// The networking port used by the tests for the root node.
const TEST_ROOT_NODE_PORT: u16 = 11223;

Expand Down Expand Up @@ -208,11 +204,12 @@ impl Network {
/// Usually dropping is enough, but when attempting to reusing listening ports immediately, this
/// gets the job done.
async fn shutdown(self) {
drop(self);
// Shutdown the sender of every reactor node to ensure the port is open again.
for node in self.nodes.into_iter() {
node.into_inner().net.shutdown_server().await;
}

debug!("dropped network, waiting for connections to terminate");
tokio::time::delay_for(NET_COOLDOWN).await;
debug!("finished waiting for connections to terminate");
debug!("shut down network");
}
}

Expand Down
6 changes: 6 additions & 0 deletions src/reactor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,12 @@ where
pub fn reactor(&self) -> &R {
&self.reactor
}

/// Deconstructs the runner to return the reactor.
#[inline]
pub fn into_inner(self) -> R {
self.reactor
}
}

/// Spawns tasks that will process the given effects.
Expand Down