From ec9a3dc3bc36b63b7d3c4f51c81f413fce5a2bb1 Mon Sep 17 00:00:00 2001 From: Marc Brinkmann Date: Fri, 19 Jun 2020 16:16:46 +0200 Subject: [PATCH 1/2] Allow users to `small_network` to shut down the server explicitly for ensure the port is usable again --- src/components/small_network.rs | 41 ++++++++++++++++++++++++++++----- src/reactor.rs | 6 +++++ 2 files changed, 41 insertions(+), 6 deletions(-) diff --git a/src/components/small_network.rs b/src/components/small_network.rs index 016e5513e9..a3bca5c0ca 100644 --- a/src/components/small_network.rs +++ b/src/components/small_network.rs @@ -77,6 +77,7 @@ use tokio::{ mpsc::{self, UnboundedReceiver, UnboundedSender}, oneshot, }, + task::JoinHandle, }; use tokio_openssl::SslStream; use tokio_serde::{formats::SymmetricalMessagePack, SymmetricallyFramed}; @@ -125,7 +126,10 @@ pub(crate) struct SmallNetwork { // Note: This channel never sends anything, instead it is closed when `SmallNetwork` is dropped, // signalling the receiver that it should cease operation. Don't listen to clippy! #[allow(dead_code)] - shutdown: oneshot::Sender<()>, + shutdown: Option>, + /// Join handle for the server thread. + #[allow(dead_code)] + server_join_handle: Option>, } impl SmallNetwork @@ -167,15 +171,16 @@ where let our_fingerprint = our_endpoint.cert().public_key_fingerprint(); // Run the server task. + // We spawn it ourselves instead of through an effect to get a hold of the join handle, + // which we need to shutdown cleanly later on. info!(%our_endpoint, "starting server background task"); let (server_shutdown_sender, server_shutdown_receiver) = oneshot::channel(); - let mut effects = server_task( + let server_join_handle = tokio::spawn(server_task( event_queue, tokio::net::TcpListener::from_std(listener).map_err(Error::ListenerConversion)?, server_shutdown_receiver, - ) - .boxed() - .ignore(); + )); + let model = SmallNetwork { cfg, signed_endpoints: hashmap! { our_fingerprint => Signed::new(&our_endpoint, &private_key)? }, @@ -184,10 +189,12 @@ where private_key: Arc::new(private_key), event_queue, outgoing: HashMap::new(), - shutdown: server_shutdown_sender, + shutdown: Some(server_shutdown_sender), + server_join_handle: Some(server_join_handle), }; // Connect to the root node if we are not the root node. + let mut effects: Multiple<_> = Default::default(); if !we_are_root { effects.extend(model.connect_to_root()); } else { @@ -197,6 +204,28 @@ where Ok((model, effects)) } + /// Close down the listening server socket. + /// + /// Signals that the background process that runs the server should shutdown completely and + /// waits for it to complete the shutdown. This explicitly allows the background task to finish + /// and drop everything it owns, ensuring that resources such as allocated ports are free to be + /// reused once this completes. + #[allow(dead_code)] + async fn shutdown_server(&mut self) { + // Close the shutdown socket, causing the server to exit. + drop(self.shutdown.take()); + + // Wait for the server to exit cleanly. + if let Some(join_handle) = self.server_join_handle.take() { + match join_handle.await { + Ok(_) => debug!("server exited cleanly"), + Err(err) => error!(%err, "could not join server task cleanly"), + } + } else { + warn!("server shutdown while already shut down") + } + } + /// Attempts to connect to the root node. fn connect_to_root(&self) -> Multiple>> { connect_trusted( diff --git a/src/reactor.rs b/src/reactor.rs index 47059c733d..289411e8e4 100644 --- a/src/reactor.rs +++ b/src/reactor.rs @@ -223,6 +223,12 @@ where pub fn reactor(&self) -> &R { &self.reactor } + + /// Deconstructs the runner to return the reactor. + #[inline] + pub fn into_inner(self) -> R { + self.reactor + } } /// Spawns tasks that will process the given effects. From 4caa25ec46891d9efc8fd9d325275964f7cf29a0 Mon Sep 17 00:00:00 2001 From: Marc Brinkmann Date: Fri, 19 Jun 2020 16:17:24 +0200 Subject: [PATCH 2/2] Remove cool down timers in favor of waiting for the socket to be dropped --- src/components/small_network/test.rs | 13 +++++-------- 1 file changed, 5 insertions(+), 8 deletions(-) diff --git a/src/components/small_network/test.rs b/src/components/small_network/test.rs index 7331872dad..b45198353e 100644 --- a/src/components/small_network/test.rs +++ b/src/components/small_network/test.rs @@ -29,10 +29,6 @@ use tracing::{debug, dispatcher::DefaultGuard, info}; /// Time interval for which to poll an observed testing network when no events have occured. const POLL_INTERVAL: Duration = Duration::from_millis(10); -/// Amount of time to wait after shutting down all nodes to give the OS networking stack time to -/// catch up. -const NET_COOLDOWN: Duration = Duration::from_millis(100); - /// The networking port used by the tests for the root node. const TEST_ROOT_NODE_PORT: u16 = 11223; @@ -208,11 +204,12 @@ impl Network { /// Usually dropping is enough, but when attempting to reusing listening ports immediately, this /// gets the job done. async fn shutdown(self) { - drop(self); + // Shutdown the sender of every reactor node to ensure the port is open again. + for node in self.nodes.into_iter() { + node.into_inner().net.shutdown_server().await; + } - debug!("dropped network, waiting for connections to terminate"); - tokio::time::delay_for(NET_COOLDOWN).await; - debug!("finished waiting for connections to terminate"); + debug!("shut down network"); } }