From d270f87872b68346ce990cb5e1de99c05185978c Mon Sep 17 00:00:00 2001 From: rotilho Date: Sun, 28 Sep 2025 18:11:30 +0200 Subject: [PATCH 1/4] Introduce `UncheckedTransactionInserter` to batch insert transactions --- .../unchecked/UncheckedTransactionInserter.kt | 55 +++++++++++++++++++ .../unchecked/UncheckedTransactionService.kt | 19 ++----- 2 files changed, 59 insertions(+), 15 deletions(-) create mode 100644 src/main/kotlin/cash/atto/node/bootstrap/unchecked/UncheckedTransactionInserter.kt diff --git a/src/main/kotlin/cash/atto/node/bootstrap/unchecked/UncheckedTransactionInserter.kt b/src/main/kotlin/cash/atto/node/bootstrap/unchecked/UncheckedTransactionInserter.kt new file mode 100644 index 00000000..898e2910 --- /dev/null +++ b/src/main/kotlin/cash/atto/node/bootstrap/unchecked/UncheckedTransactionInserter.kt @@ -0,0 +1,55 @@ +package cash.atto.node.bootstrap.unchecked + +import cash.atto.node.toBigInteger +import kotlinx.coroutines.reactive.awaitSingle +import kotlinx.datetime.toJavaInstant +import kotlinx.io.readByteArray +import org.springframework.r2dbc.core.DatabaseClient +import org.springframework.stereotype.Component +import org.springframework.transaction.annotation.Transactional +import reactor.core.publisher.Flux + +@Component +class UncheckedTransactionInserter( + private val databaseClient: DatabaseClient +) { + private val sql = """ + INSERT INTO unchecked_transaction + (hash, public_key, height, previous, `timestamp`, serialized, received_at) + VALUES (?, ?, ?, ?, ?, ?, ?) + ON DUPLICATE KEY UPDATE hash = hash + """.trimIndent() + + @Transactional + suspend fun insert(uncheckedTransactions: Collection): Long { + if (uncheckedTransactions.isEmpty()) return 0 + + return databaseClient.inConnection { conn -> + val last = uncheckedTransactions.size - 1 + + val statement = conn.createStatement(sql) + uncheckedTransactions.forEachIndexed { i, t -> + statement.apply { + bind(0, t.hash.value) + bind(1, t.publicKey.value) + bind(2, t.height.value.toBigInteger()) + if (t.previous != null) { + bind(3, t.previous.value) + } else { + bindNull(3, ByteArray::class.java) + } + bind(4, t.block.timestamp.toJavaInstant()) + bind(5, t.toTransaction().toAttoTransaction().toBuffer().readByteArray()) + bind(6, t.receivedAt) + + if (i != last) { + add() + } + } + } + Flux.from(statement.execute()) + .flatMap { it.rowsUpdated } + .reduce(0L, Long::plus) + }.awaitSingle() + } +} diff --git a/src/main/kotlin/cash/atto/node/bootstrap/unchecked/UncheckedTransactionService.kt b/src/main/kotlin/cash/atto/node/bootstrap/unchecked/UncheckedTransactionService.kt index ab8bc744..b181c49b 100644 --- a/src/main/kotlin/cash/atto/node/bootstrap/unchecked/UncheckedTransactionService.kt +++ b/src/main/kotlin/cash/atto/node/bootstrap/unchecked/UncheckedTransactionService.kt @@ -4,33 +4,22 @@ import cash.atto.node.EventPublisher import cash.atto.node.bootstrap.UncheckedTransactionSaved import cash.atto.node.executeAfterCommit import io.github.oshai.kotlinlogging.KotlinLogging -import kotlinx.coroutines.flow.map -import kotlinx.coroutines.flow.toList -import kotlinx.coroutines.flow.toSet import org.springframework.stereotype.Service -import org.springframework.transaction.annotation.Isolation import org.springframework.transaction.annotation.Transactional @Service class UncheckedTransactionService( private val uncheckedTransactionRepository: UncheckedTransactionRepository, + private val uncheckedTransactionInserter: UncheckedTransactionInserter, private val eventPublisher: EventPublisher, ) { private val logger = KotlinLogging.logger {} - @Transactional(isolation = Isolation.SERIALIZABLE) + @Transactional suspend fun save(uncheckedTransactions: Collection) { - val existingTransactions = - uncheckedTransactionRepository - .findAllById(uncheckedTransactions.map { it.id }) - .map { it.hash } - .toSet() - - val transactionsToSave = uncheckedTransactions.filter { it.hash !in existingTransactions } - - val savedTransactions = uncheckedTransactionRepository.saveAll(transactionsToSave).toList() + uncheckedTransactionInserter.insert(uncheckedTransactions) executeAfterCommit { - savedTransactions.forEach { + uncheckedTransactions.forEach { logger.debug { "Saved $it" } eventPublisher.publish(UncheckedTransactionSaved(it)) } From 5a79b8a0b3cd1f8cc5665ce52dd6683a1c96a891 Mon Sep 17 00:00:00 2001 From: rotilho Date: Mon, 29 Sep 2025 08:24:48 +0200 Subject: [PATCH 2/4] Fix unchecked transaction query to handle missing accounts --- .../unchecked/UncheckedTransactionRepository.kt | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/src/main/kotlin/cash/atto/node/bootstrap/unchecked/UncheckedTransactionRepository.kt b/src/main/kotlin/cash/atto/node/bootstrap/unchecked/UncheckedTransactionRepository.kt index 6f3afb01..12e77611 100644 --- a/src/main/kotlin/cash/atto/node/bootstrap/unchecked/UncheckedTransactionRepository.kt +++ b/src/main/kotlin/cash/atto/node/bootstrap/unchecked/UncheckedTransactionRepository.kt @@ -14,10 +14,13 @@ interface UncheckedTransactionRepository : AttoRepository { @Query( """ - SELECT ut.* FROM unchecked_transaction ut - JOIN account a ON a.public_key = ut.public_key AND ut.height > a.height - ORDER BY timestamp - LIMIT :limit + SELECT ut.* + FROM unchecked_transaction AS ut + LEFT JOIN account AS a + ON a.public_key = ut.public_key + WHERE ut.height > COALESCE(a.height, 0) + ORDER BY ut.timestamp + LIMIT :limit; """, ) suspend fun findTopOldest(limit: Long): Flow From 6d108ccba3c47d3d33424d21c99205438fbca8af Mon Sep 17 00:00:00 2001 From: rotilho Date: Mon, 29 Sep 2025 08:32:17 +0200 Subject: [PATCH 3/4] Immediately rebroadcast transactions when the source is REST --- .../transaction/TransactionRebroadcaster.kt | 23 +++++++++++++++---- 1 file changed, 18 insertions(+), 5 deletions(-) diff --git a/src/main/kotlin/cash/atto/node/transaction/TransactionRebroadcaster.kt b/src/main/kotlin/cash/atto/node/transaction/TransactionRebroadcaster.kt index 333f2d25..e1a741ba 100644 --- a/src/main/kotlin/cash/atto/node/transaction/TransactionRebroadcaster.kt +++ b/src/main/kotlin/cash/atto/node/transaction/TransactionRebroadcaster.kt @@ -6,6 +6,7 @@ import cash.atto.node.CacheSupport import cash.atto.node.network.BroadcastNetworkMessage import cash.atto.node.network.BroadcastStrategy import cash.atto.node.network.InboundNetworkMessage +import cash.atto.node.network.MessageSource import cash.atto.node.network.NetworkMessagePublisher import cash.atto.protocol.AttoTransactionPush import io.github.oshai.kotlinlogging.KotlinLogging @@ -37,7 +38,7 @@ class TransactionRebroadcaster( fun process(message: InboundNetworkMessage) { val transaction = message.payload.transaction - val new = broadcastQueue.seen(transaction, message.publicUri) + val new = broadcastQueue.seen(message.source, transaction, message.publicUri) if (new) { logger.trace { "Started monitoring transaction to rebroadcast. $transaction" } @@ -46,8 +47,16 @@ class TransactionRebroadcaster( @EventListener suspend fun process(event: TransactionValidated) { - if (broadcastQueue.enqueue(event.transaction.hash)) { - logger.trace { "Transaction queued for rebroadcast. ${event.transaction}" } + val hash = event.transaction.hash + broadcastQueue.get(hash)?.let { + if (it.source == MessageSource.REST) { + logger.trace { "Rebroadcasting transaction from rest: ${event.transaction}" } + broadcastQueue.drop(hash) + broadcast(it.transaction) + } + } + if (broadcastQueue.enqueue(hash)) { + logger.trace { "Transaction queued for rebroadcast: ${event.transaction}" } } } @@ -75,7 +84,7 @@ class TransactionRebroadcaster( private fun broadcast( transaction: AttoTransaction, - exceptions: Set, + exceptions: Set = setOf(), ) { val transactionPush = AttoTransactionPush(transaction) val message = @@ -94,6 +103,7 @@ class TransactionRebroadcaster( } private class TransactionSocketAddressHolder( + val source: MessageSource, val transaction: AttoTransaction, ) { val publicUris = HashSet() @@ -108,18 +118,21 @@ private class BroadcastQueue { private val transactionQueue = Channel(capacity = UNLIMITED) fun seen( + source: MessageSource, transaction: AttoTransaction, publicUri: URI, ): Boolean { val holder = holderMap.compute(transaction.hash) { _, v -> - val holder = v ?: TransactionSocketAddressHolder(transaction) + val holder = v ?: TransactionSocketAddressHolder(source, transaction) holder.add(publicUri) holder } return holder!!.publicUris.size == 1 } + fun get(hash: AttoHash): TransactionSocketAddressHolder? = holderMap[hash] + fun drop(hash: AttoHash) { holderMap.remove(hash) } From 98eba46125dbf74635d4ef0aae5cb15dc51268cc Mon Sep 17 00:00:00 2001 From: rotilho Date: Mon, 29 Sep 2025 15:06:26 +0200 Subject: [PATCH 4/4] Reduce dead locks --- .../unchecked/UncheckedTransactionInserter.kt | 65 +++++++++++-------- .../UncheckedTransactionProcessor.kt | 3 +- .../unchecked/UncheckedTransactionService.kt | 9 ++- 3 files changed, 46 insertions(+), 31 deletions(-) diff --git a/src/main/kotlin/cash/atto/node/bootstrap/unchecked/UncheckedTransactionInserter.kt b/src/main/kotlin/cash/atto/node/bootstrap/unchecked/UncheckedTransactionInserter.kt index 898e2910..3125789c 100644 --- a/src/main/kotlin/cash/atto/node/bootstrap/unchecked/UncheckedTransactionInserter.kt +++ b/src/main/kotlin/cash/atto/node/bootstrap/unchecked/UncheckedTransactionInserter.kt @@ -6,50 +6,61 @@ import kotlinx.datetime.toJavaInstant import kotlinx.io.readByteArray import org.springframework.r2dbc.core.DatabaseClient import org.springframework.stereotype.Component +import org.springframework.transaction.annotation.Isolation import org.springframework.transaction.annotation.Transactional import reactor.core.publisher.Flux @Component class UncheckedTransactionInserter( - private val databaseClient: DatabaseClient + private val databaseClient: DatabaseClient, ) { - private val sql = """ + private val sql = + """ INSERT INTO unchecked_transaction (hash, public_key, height, previous, `timestamp`, serialized, received_at) VALUES (?, ?, ?, ?, ?, ?, ?) ON DUPLICATE KEY UPDATE hash = hash - """.trimIndent() + """.trimIndent() - @Transactional + @Transactional(isolation = Isolation.READ_COMMITTED) suspend fun insert(uncheckedTransactions: Collection): Long { if (uncheckedTransactions.isEmpty()) return 0 - return databaseClient.inConnection { conn -> - val last = uncheckedTransactions.size - 1 + return databaseClient + .inConnection { conn -> + val last = uncheckedTransactions.size - 1 - val statement = conn.createStatement(sql) - uncheckedTransactions.forEachIndexed { i, t -> - statement.apply { - bind(0, t.hash.value) - bind(1, t.publicKey.value) - bind(2, t.height.value.toBigInteger()) - if (t.previous != null) { - bind(3, t.previous.value) - } else { - bindNull(3, ByteArray::class.java) - } - bind(4, t.block.timestamp.toJavaInstant()) - bind(5, t.toTransaction().toAttoTransaction().toBuffer().readByteArray()) - bind(6, t.receivedAt) + val statement = conn.createStatement(sql) + uncheckedTransactions.forEachIndexed { i, t -> + statement.apply { + bind(0, t.hash.value) + bind(1, t.publicKey.value) + bind(2, t.height.value.toBigInteger()) + if (t.previous != null) { + bind(3, t.previous.value) + } else { + bindNull(3, ByteArray::class.java) + } + bind(4, t.block.timestamp.toJavaInstant()) + bind( + 5, + t + .toTransaction() + .toAttoTransaction() + .toBuffer() + .readByteArray(), + ) + bind(6, t.receivedAt) - if (i != last) { - add() + if (i != last) { + add() + } } } - } - Flux.from(statement.execute()) - .flatMap { it.rowsUpdated } - .reduce(0L, Long::plus) - }.awaitSingle() + Flux + .from(statement.execute()) + .flatMap { it.rowsUpdated } + .reduce(0L, Long::plus) + }.awaitSingle() } } diff --git a/src/main/kotlin/cash/atto/node/bootstrap/unchecked/UncheckedTransactionProcessor.kt b/src/main/kotlin/cash/atto/node/bootstrap/unchecked/UncheckedTransactionProcessor.kt index fcdd74ee..35c15243 100644 --- a/src/main/kotlin/cash/atto/node/bootstrap/unchecked/UncheckedTransactionProcessor.kt +++ b/src/main/kotlin/cash/atto/node/bootstrap/unchecked/UncheckedTransactionProcessor.kt @@ -18,6 +18,7 @@ import kotlinx.coroutines.sync.Mutex import kotlinx.coroutines.sync.withLock import org.springframework.scheduling.annotation.Scheduled import org.springframework.stereotype.Component +import org.springframework.transaction.annotation.Isolation import org.springframework.transaction.annotation.Transactional import java.util.concurrent.TimeUnit @@ -32,7 +33,7 @@ class UncheckedTransactionProcessor( ) { private val mutex = Mutex() - @Transactional + @Transactional(isolation = Isolation.READ_COMMITTED) suspend fun process(candidateTransactions: Collection): Int { if (candidateTransactions.isEmpty()) { return 0 diff --git a/src/main/kotlin/cash/atto/node/bootstrap/unchecked/UncheckedTransactionService.kt b/src/main/kotlin/cash/atto/node/bootstrap/unchecked/UncheckedTransactionService.kt index b181c49b..de700b64 100644 --- a/src/main/kotlin/cash/atto/node/bootstrap/unchecked/UncheckedTransactionService.kt +++ b/src/main/kotlin/cash/atto/node/bootstrap/unchecked/UncheckedTransactionService.kt @@ -5,6 +5,7 @@ import cash.atto.node.bootstrap.UncheckedTransactionSaved import cash.atto.node.executeAfterCommit import io.github.oshai.kotlinlogging.KotlinLogging import org.springframework.stereotype.Service +import org.springframework.transaction.annotation.Isolation import org.springframework.transaction.annotation.Transactional @Service @@ -15,7 +16,7 @@ class UncheckedTransactionService( ) { private val logger = KotlinLogging.logger {} - @Transactional + @Transactional(isolation = Isolation.READ_COMMITTED) suspend fun save(uncheckedTransactions: Collection) { uncheckedTransactionInserter.insert(uncheckedTransactions) executeAfterCommit { @@ -26,9 +27,11 @@ class UncheckedTransactionService( } } - @Transactional + @Transactional(isolation = Isolation.READ_COMMITTED) suspend fun cleanUp() { val deletedCount = uncheckedTransactionRepository.deleteExistingInTransaction() - logger.debug { "Deleted $deletedCount unchecked transactions" } + if (deletedCount > 0) { + logger.debug { "Deleted $deletedCount unchecked transactions" } + } } }