Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 5 additions & 7 deletions src/main/kotlin/cash/atto/node/network/NetworkProcessor.kt
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import io.ktor.server.routing.post
import io.ktor.server.routing.routing
import io.ktor.server.websocket.webSocket
import jakarta.annotation.PreDestroy
import kotlinx.coroutines.CancellationException
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.SupervisorJob
import kotlinx.coroutines.asCoroutineDispatcher
Expand Down Expand Up @@ -352,6 +353,7 @@ class NetworkProcessor(
val connectionSocketAddress = InetSocketAddress(call.request.origin.remoteHost, call.request.origin.remotePort)

connectionManager.manage(node, connectionSocketAddress, this)
} catch (_: CancellationException) {
} catch (e: Exception) {
logger.trace(e) { "Exception during handshake with ${call.request.origin.remoteHost}" }
call.respond(HttpStatusCode.InternalServerError)
Expand All @@ -370,8 +372,8 @@ class NetworkProcessor(
clear()
httpClient.close()
websocketClient.close()
server.stop()
scope.cancel()
server.stop()
}

@Scheduled(fixedRate = 1_000)
Expand Down Expand Up @@ -403,12 +405,8 @@ class NetworkProcessor(
}

scope.launch {
try {
logger.trace { "Start connection to $publicUri" }
connection(publicUri)
} catch (e: Exception) {
logger.trace(e) { "Exception during connection to $publicUri" }
}
logger.trace { "Start connection to $publicUri" }
connection(publicUri)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import io.ktor.websocket.WebSocketSession
import io.ktor.websocket.close
import io.ktor.websocket.readBytes
import jakarta.annotation.PreDestroy
import kotlinx.coroutines.CancellationException
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.SupervisorJob
import kotlinx.coroutines.asCoroutineDispatcher
Expand Down Expand Up @@ -105,7 +106,8 @@ class NodeConnectionManager(
.onStart {
eventPublisher.publish(NodeConnected(connectionSocketAddress, node))
}.onCompletion {
logger.trace(it) { "Inbound message stream from ${node.publicUri} completed" }
val cause = it?.takeUnless { it is CancellationException }
logger.trace(cause) { "Inbound message stream from ${node.publicUri} completed" }
connectionMap.remove(publicUri)
}.collect {
val message = NetworkSerializer.deserialize(it)
Expand Down Expand Up @@ -133,8 +135,8 @@ class NodeConnectionManager(
messagePublisher.publish(networkMessage)
}
} catch (e: Exception) {
logger.debug(e) { "Exception during inbound message stream from ${node.publicUri}" }
connectionMap.remove(publicUri)
throw e
} finally {
connection.disconnect()
}
Expand Down Expand Up @@ -208,8 +210,9 @@ class NodeConnectionManager(
.consumeAsFlow()
.filterIsInstance<Frame.Binary>()
.onStart { logger.info { "Connected to ${node.publicUri} ${node.publicKey}" } }
.onCompletion { cause -> logger.info(cause) { "Disconnected from ${node.publicUri}" } }
.map { it.readBytes() }
.onCompletion { cause ->
logger.info(cause?.takeUnless { it is CancellationException }) { "Disconnected from ${node.publicUri}" }
}.map { it.readBytes() }

suspend fun disconnect() {
try {
Expand Down
6 changes: 5 additions & 1 deletion src/test/kotlin/cash/atto/node/CucumberConfiguration.kt
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,11 @@ class CucumberConfiguration(
@Before
fun before() =
runBlocking {
NodeHolder.clear(context)
caches.forEach {
it.clear()
}

NodeHolder.clear(except = context)
NodeHolder.add(context)

Waiter.waitUntilTrue { connectionManager.connectionCount == 0 }
Expand Down
42 changes: 32 additions & 10 deletions src/test/kotlin/cash/atto/node/NodeHolder.kt
Original file line number Diff line number Diff line change
@@ -1,26 +1,48 @@
package cash.atto.node

import io.github.oshai.kotlinlogging.KotlinLogging
import java.io.Closeable
import java.net.URLClassLoader
import java.util.Collections

object NodeHolder {
private val nodes = ArrayList<Closeable>()
private val logger = KotlinLogging.logger {}

private val nodes = HashSet<Closeable>()

fun add(context: Closeable) {
nodes.add(context)
}

fun getAll(): List<Closeable> = Collections.unmodifiableList(nodes)

fun clear(except: Closeable) {
nodes
.asSequence()
.filter { it != except }
.forEach {
it.close()
(it.javaClass.classLoader as URLClassLoader).close()
val toClose = nodes.filter { it != except }

toClose.forEach { it.close() }

toClose.forEach { node ->
val classLoader = node.javaClass.classLoader as? URLClassLoader ?: return@forEach
val deadline = System.currentTimeMillis() + 10_000
while (System.currentTimeMillis() < deadline) {
val alive =
Thread.getAllStackTraces().keys.any {
it.contextClassLoader == classLoader && it.isAlive && !it.isDaemon
}
if (!alive) break
Thread.sleep(100)
}
// Clear classloader reference from daemon threads (shared pool threads) to allow GC
Thread
.getAllStackTraces()
.keys
.filter { it.contextClassLoader == classLoader && it.isAlive && it.isDaemon }
.forEach { it.contextClassLoader = null }

val remaining = Thread.getAllStackTraces().keys.filter { it.contextClassLoader == classLoader && it.isAlive }
if (remaining.isNotEmpty()) {
logger.warn { "Closing classloader with ${remaining.size} non-daemon threads still alive: ${remaining.map { it.name }}" }
}
classLoader.close()
}

nodes.clear()
}
}
1 change: 1 addition & 0 deletions src/test/kotlin/cash/atto/node/PropertyHolder.kt
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ object PropertyHolder {
): String = createKey(value.javaClass, key)

fun clear() {
activeKeys.clear()
properties.clear()
}
}
1 change: 1 addition & 0 deletions src/test/resources/application-default.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ logging:
level:
cash.atto: INFO
cash.atto.node.network: TRACE
cash.atto.node.network.NetworkProcessor: DEBUG
cash.atto.node.bootstrap: TRACE
cash.atto.node.EventPublisher: TRACE
io.r2dbc.h2: INFO
Expand Down
Loading