diff --git a/src/main/kotlin/cash/atto/node/bootstrap/unchecked/UncheckedTransactionInserter.kt b/src/main/kotlin/cash/atto/node/bootstrap/unchecked/UncheckedTransactionInserter.kt new file mode 100644 index 00000000..3125789c --- /dev/null +++ b/src/main/kotlin/cash/atto/node/bootstrap/unchecked/UncheckedTransactionInserter.kt @@ -0,0 +1,66 @@ +package cash.atto.node.bootstrap.unchecked + +import cash.atto.node.toBigInteger +import kotlinx.coroutines.reactive.awaitSingle +import kotlinx.datetime.toJavaInstant +import kotlinx.io.readByteArray +import org.springframework.r2dbc.core.DatabaseClient +import org.springframework.stereotype.Component +import org.springframework.transaction.annotation.Isolation +import org.springframework.transaction.annotation.Transactional +import reactor.core.publisher.Flux + +@Component +class UncheckedTransactionInserter( + private val databaseClient: DatabaseClient, +) { + private val sql = + """ + INSERT INTO unchecked_transaction + (hash, public_key, height, previous, `timestamp`, serialized, received_at) + VALUES (?, ?, ?, ?, ?, ?, ?) + ON DUPLICATE KEY UPDATE hash = hash + """.trimIndent() + + @Transactional(isolation = Isolation.READ_COMMITTED) + suspend fun insert(uncheckedTransactions: Collection): Long { + if (uncheckedTransactions.isEmpty()) return 0 + + return databaseClient + .inConnection { conn -> + val last = uncheckedTransactions.size - 1 + + val statement = conn.createStatement(sql) + uncheckedTransactions.forEachIndexed { i, t -> + statement.apply { + bind(0, t.hash.value) + bind(1, t.publicKey.value) + bind(2, t.height.value.toBigInteger()) + if (t.previous != null) { + bind(3, t.previous.value) + } else { + bindNull(3, ByteArray::class.java) + } + bind(4, t.block.timestamp.toJavaInstant()) + bind( + 5, + t + .toTransaction() + .toAttoTransaction() + .toBuffer() + .readByteArray(), + ) + bind(6, t.receivedAt) + + if (i != last) { + add() + } + } + } + Flux + .from(statement.execute()) + .flatMap { it.rowsUpdated } + .reduce(0L, Long::plus) + }.awaitSingle() + } +} diff --git a/src/main/kotlin/cash/atto/node/bootstrap/unchecked/UncheckedTransactionProcessor.kt b/src/main/kotlin/cash/atto/node/bootstrap/unchecked/UncheckedTransactionProcessor.kt index fcdd74ee..35c15243 100644 --- a/src/main/kotlin/cash/atto/node/bootstrap/unchecked/UncheckedTransactionProcessor.kt +++ b/src/main/kotlin/cash/atto/node/bootstrap/unchecked/UncheckedTransactionProcessor.kt @@ -18,6 +18,7 @@ import kotlinx.coroutines.sync.Mutex import kotlinx.coroutines.sync.withLock import org.springframework.scheduling.annotation.Scheduled import org.springframework.stereotype.Component +import org.springframework.transaction.annotation.Isolation import org.springframework.transaction.annotation.Transactional import java.util.concurrent.TimeUnit @@ -32,7 +33,7 @@ class UncheckedTransactionProcessor( ) { private val mutex = Mutex() - @Transactional + @Transactional(isolation = Isolation.READ_COMMITTED) suspend fun process(candidateTransactions: Collection): Int { if (candidateTransactions.isEmpty()) { return 0 diff --git a/src/main/kotlin/cash/atto/node/bootstrap/unchecked/UncheckedTransactionRepository.kt b/src/main/kotlin/cash/atto/node/bootstrap/unchecked/UncheckedTransactionRepository.kt index 6f3afb01..12e77611 100644 --- a/src/main/kotlin/cash/atto/node/bootstrap/unchecked/UncheckedTransactionRepository.kt +++ b/src/main/kotlin/cash/atto/node/bootstrap/unchecked/UncheckedTransactionRepository.kt @@ -14,10 +14,13 @@ interface UncheckedTransactionRepository : AttoRepository { @Query( """ - SELECT ut.* FROM unchecked_transaction ut - JOIN account a ON a.public_key = ut.public_key AND ut.height > a.height - ORDER BY timestamp - LIMIT :limit + SELECT ut.* + FROM unchecked_transaction AS ut + LEFT JOIN account AS a + ON a.public_key = ut.public_key + WHERE ut.height > COALESCE(a.height, 0) + ORDER BY ut.timestamp + LIMIT :limit; """, ) suspend fun findTopOldest(limit: Long): Flow diff --git a/src/main/kotlin/cash/atto/node/bootstrap/unchecked/UncheckedTransactionService.kt b/src/main/kotlin/cash/atto/node/bootstrap/unchecked/UncheckedTransactionService.kt index ab8bc744..de700b64 100644 --- a/src/main/kotlin/cash/atto/node/bootstrap/unchecked/UncheckedTransactionService.kt +++ b/src/main/kotlin/cash/atto/node/bootstrap/unchecked/UncheckedTransactionService.kt @@ -4,9 +4,6 @@ import cash.atto.node.EventPublisher import cash.atto.node.bootstrap.UncheckedTransactionSaved import cash.atto.node.executeAfterCommit import io.github.oshai.kotlinlogging.KotlinLogging -import kotlinx.coroutines.flow.map -import kotlinx.coroutines.flow.toList -import kotlinx.coroutines.flow.toSet import org.springframework.stereotype.Service import org.springframework.transaction.annotation.Isolation import org.springframework.transaction.annotation.Transactional @@ -14,32 +11,27 @@ import org.springframework.transaction.annotation.Transactional @Service class UncheckedTransactionService( private val uncheckedTransactionRepository: UncheckedTransactionRepository, + private val uncheckedTransactionInserter: UncheckedTransactionInserter, private val eventPublisher: EventPublisher, ) { private val logger = KotlinLogging.logger {} - @Transactional(isolation = Isolation.SERIALIZABLE) + @Transactional(isolation = Isolation.READ_COMMITTED) suspend fun save(uncheckedTransactions: Collection) { - val existingTransactions = - uncheckedTransactionRepository - .findAllById(uncheckedTransactions.map { it.id }) - .map { it.hash } - .toSet() - - val transactionsToSave = uncheckedTransactions.filter { it.hash !in existingTransactions } - - val savedTransactions = uncheckedTransactionRepository.saveAll(transactionsToSave).toList() + uncheckedTransactionInserter.insert(uncheckedTransactions) executeAfterCommit { - savedTransactions.forEach { + uncheckedTransactions.forEach { logger.debug { "Saved $it" } eventPublisher.publish(UncheckedTransactionSaved(it)) } } } - @Transactional + @Transactional(isolation = Isolation.READ_COMMITTED) suspend fun cleanUp() { val deletedCount = uncheckedTransactionRepository.deleteExistingInTransaction() - logger.debug { "Deleted $deletedCount unchecked transactions" } + if (deletedCount > 0) { + logger.debug { "Deleted $deletedCount unchecked transactions" } + } } } diff --git a/src/main/kotlin/cash/atto/node/transaction/TransactionRebroadcaster.kt b/src/main/kotlin/cash/atto/node/transaction/TransactionRebroadcaster.kt index 333f2d25..e1a741ba 100644 --- a/src/main/kotlin/cash/atto/node/transaction/TransactionRebroadcaster.kt +++ b/src/main/kotlin/cash/atto/node/transaction/TransactionRebroadcaster.kt @@ -6,6 +6,7 @@ import cash.atto.node.CacheSupport import cash.atto.node.network.BroadcastNetworkMessage import cash.atto.node.network.BroadcastStrategy import cash.atto.node.network.InboundNetworkMessage +import cash.atto.node.network.MessageSource import cash.atto.node.network.NetworkMessagePublisher import cash.atto.protocol.AttoTransactionPush import io.github.oshai.kotlinlogging.KotlinLogging @@ -37,7 +38,7 @@ class TransactionRebroadcaster( fun process(message: InboundNetworkMessage) { val transaction = message.payload.transaction - val new = broadcastQueue.seen(transaction, message.publicUri) + val new = broadcastQueue.seen(message.source, transaction, message.publicUri) if (new) { logger.trace { "Started monitoring transaction to rebroadcast. $transaction" } @@ -46,8 +47,16 @@ class TransactionRebroadcaster( @EventListener suspend fun process(event: TransactionValidated) { - if (broadcastQueue.enqueue(event.transaction.hash)) { - logger.trace { "Transaction queued for rebroadcast. ${event.transaction}" } + val hash = event.transaction.hash + broadcastQueue.get(hash)?.let { + if (it.source == MessageSource.REST) { + logger.trace { "Rebroadcasting transaction from rest: ${event.transaction}" } + broadcastQueue.drop(hash) + broadcast(it.transaction) + } + } + if (broadcastQueue.enqueue(hash)) { + logger.trace { "Transaction queued for rebroadcast: ${event.transaction}" } } } @@ -75,7 +84,7 @@ class TransactionRebroadcaster( private fun broadcast( transaction: AttoTransaction, - exceptions: Set, + exceptions: Set = setOf(), ) { val transactionPush = AttoTransactionPush(transaction) val message = @@ -94,6 +103,7 @@ class TransactionRebroadcaster( } private class TransactionSocketAddressHolder( + val source: MessageSource, val transaction: AttoTransaction, ) { val publicUris = HashSet() @@ -108,18 +118,21 @@ private class BroadcastQueue { private val transactionQueue = Channel(capacity = UNLIMITED) fun seen( + source: MessageSource, transaction: AttoTransaction, publicUri: URI, ): Boolean { val holder = holderMap.compute(transaction.hash) { _, v -> - val holder = v ?: TransactionSocketAddressHolder(transaction) + val holder = v ?: TransactionSocketAddressHolder(source, transaction) holder.add(publicUri) holder } return holder!!.publicUris.size == 1 } + fun get(hash: AttoHash): TransactionSocketAddressHolder? = holderMap[hash] + fun drop(hash: AttoHash) { holderMap.remove(hash) }