diff --git a/src/main/kotlin/cash/atto/node/network/NetworkProcessor.kt b/src/main/kotlin/cash/atto/node/network/NetworkProcessor.kt index cc1780bc..8f1cb541 100644 --- a/src/main/kotlin/cash/atto/node/network/NetworkProcessor.kt +++ b/src/main/kotlin/cash/atto/node/network/NetworkProcessor.kt @@ -38,6 +38,7 @@ import io.ktor.server.routing.post import io.ktor.server.routing.routing import io.ktor.server.websocket.webSocket import jakarta.annotation.PreDestroy +import kotlinx.coroutines.CancellationException import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.SupervisorJob import kotlinx.coroutines.asCoroutineDispatcher @@ -352,6 +353,7 @@ class NetworkProcessor( val connectionSocketAddress = InetSocketAddress(call.request.origin.remoteHost, call.request.origin.remotePort) connectionManager.manage(node, connectionSocketAddress, this) + } catch (_: CancellationException) { } catch (e: Exception) { logger.trace(e) { "Exception during handshake with ${call.request.origin.remoteHost}" } call.respond(HttpStatusCode.InternalServerError) @@ -370,8 +372,8 @@ class NetworkProcessor( clear() httpClient.close() websocketClient.close() - server.stop() scope.cancel() + server.stop() } @Scheduled(fixedRate = 1_000) @@ -403,12 +405,8 @@ class NetworkProcessor( } scope.launch { - try { - logger.trace { "Start connection to $publicUri" } - connection(publicUri) - } catch (e: Exception) { - logger.trace(e) { "Exception during connection to $publicUri" } - } + logger.trace { "Start connection to $publicUri" } + connection(publicUri) } } diff --git a/src/main/kotlin/cash/atto/node/network/NodeConnectionManager.kt b/src/main/kotlin/cash/atto/node/network/NodeConnectionManager.kt index efb57a45..ab6ca578 100644 --- a/src/main/kotlin/cash/atto/node/network/NodeConnectionManager.kt +++ b/src/main/kotlin/cash/atto/node/network/NodeConnectionManager.kt @@ -14,6 +14,7 @@ import io.ktor.websocket.WebSocketSession import io.ktor.websocket.close import io.ktor.websocket.readBytes import jakarta.annotation.PreDestroy +import kotlinx.coroutines.CancellationException import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.SupervisorJob import kotlinx.coroutines.asCoroutineDispatcher @@ -105,7 +106,8 @@ class NodeConnectionManager( .onStart { eventPublisher.publish(NodeConnected(connectionSocketAddress, node)) }.onCompletion { - logger.trace(it) { "Inbound message stream from ${node.publicUri} completed" } + val cause = it?.takeUnless { it is CancellationException } + logger.trace(cause) { "Inbound message stream from ${node.publicUri} completed" } connectionMap.remove(publicUri) }.collect { val message = NetworkSerializer.deserialize(it) @@ -133,8 +135,8 @@ class NodeConnectionManager( messagePublisher.publish(networkMessage) } } catch (e: Exception) { - logger.debug(e) { "Exception during inbound message stream from ${node.publicUri}" } connectionMap.remove(publicUri) + throw e } finally { connection.disconnect() } @@ -208,8 +210,9 @@ class NodeConnectionManager( .consumeAsFlow() .filterIsInstance() .onStart { logger.info { "Connected to ${node.publicUri} ${node.publicKey}" } } - .onCompletion { cause -> logger.info(cause) { "Disconnected from ${node.publicUri}" } } - .map { it.readBytes() } + .onCompletion { cause -> + logger.info(cause?.takeUnless { it is CancellationException }) { "Disconnected from ${node.publicUri}" } + }.map { it.readBytes() } suspend fun disconnect() { try { diff --git a/src/test/kotlin/cash/atto/node/CucumberConfiguration.kt b/src/test/kotlin/cash/atto/node/CucumberConfiguration.kt index f0760a22..72d054b8 100644 --- a/src/test/kotlin/cash/atto/node/CucumberConfiguration.kt +++ b/src/test/kotlin/cash/atto/node/CucumberConfiguration.kt @@ -35,7 +35,11 @@ class CucumberConfiguration( @Before fun before() = runBlocking { - NodeHolder.clear(context) + caches.forEach { + it.clear() + } + + NodeHolder.clear(except = context) NodeHolder.add(context) Waiter.waitUntilTrue { connectionManager.connectionCount == 0 } diff --git a/src/test/kotlin/cash/atto/node/NodeHolder.kt b/src/test/kotlin/cash/atto/node/NodeHolder.kt index 16a23671..d41c3eec 100644 --- a/src/test/kotlin/cash/atto/node/NodeHolder.kt +++ b/src/test/kotlin/cash/atto/node/NodeHolder.kt @@ -1,26 +1,48 @@ package cash.atto.node +import io.github.oshai.kotlinlogging.KotlinLogging import java.io.Closeable import java.net.URLClassLoader -import java.util.Collections object NodeHolder { - private val nodes = ArrayList() + private val logger = KotlinLogging.logger {} + + private val nodes = HashSet() fun add(context: Closeable) { nodes.add(context) } - fun getAll(): List = Collections.unmodifiableList(nodes) - fun clear(except: Closeable) { - nodes - .asSequence() - .filter { it != except } - .forEach { - it.close() - (it.javaClass.classLoader as URLClassLoader).close() + val toClose = nodes.filter { it != except } + + toClose.forEach { it.close() } + + toClose.forEach { node -> + val classLoader = node.javaClass.classLoader as? URLClassLoader ?: return@forEach + val deadline = System.currentTimeMillis() + 10_000 + while (System.currentTimeMillis() < deadline) { + val alive = + Thread.getAllStackTraces().keys.any { + it.contextClassLoader == classLoader && it.isAlive && !it.isDaemon + } + if (!alive) break + Thread.sleep(100) } + // Clear classloader reference from daemon threads (shared pool threads) to allow GC + Thread + .getAllStackTraces() + .keys + .filter { it.contextClassLoader == classLoader && it.isAlive && it.isDaemon } + .forEach { it.contextClassLoader = null } + + val remaining = Thread.getAllStackTraces().keys.filter { it.contextClassLoader == classLoader && it.isAlive } + if (remaining.isNotEmpty()) { + logger.warn { "Closing classloader with ${remaining.size} non-daemon threads still alive: ${remaining.map { it.name }}" } + } + classLoader.close() + } + nodes.clear() } } diff --git a/src/test/kotlin/cash/atto/node/PropertyHolder.kt b/src/test/kotlin/cash/atto/node/PropertyHolder.kt index e043ba70..b77dee44 100644 --- a/src/test/kotlin/cash/atto/node/PropertyHolder.kt +++ b/src/test/kotlin/cash/atto/node/PropertyHolder.kt @@ -76,6 +76,7 @@ object PropertyHolder { ): String = createKey(value.javaClass, key) fun clear() { + activeKeys.clear() properties.clear() } } diff --git a/src/test/resources/application-default.yaml b/src/test/resources/application-default.yaml index 55c07fd8..ebfd655c 100644 --- a/src/test/resources/application-default.yaml +++ b/src/test/resources/application-default.yaml @@ -31,6 +31,7 @@ logging: level: cash.atto: INFO cash.atto.node.network: TRACE + cash.atto.node.network.NetworkProcessor: DEBUG cash.atto.node.bootstrap: TRACE cash.atto.node.EventPublisher: TRACE io.r2dbc.h2: INFO