Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package cash.atto.node.bootstrap.unchecked

import cash.atto.node.toBigInteger
import kotlinx.coroutines.reactive.awaitSingle
import kotlinx.datetime.toJavaInstant
import kotlinx.io.readByteArray
import org.springframework.r2dbc.core.DatabaseClient
import org.springframework.stereotype.Component
import org.springframework.transaction.annotation.Isolation
import org.springframework.transaction.annotation.Transactional
import reactor.core.publisher.Flux

@Component
class UncheckedTransactionInserter(
private val databaseClient: DatabaseClient,
) {
private val sql =
"""
INSERT INTO unchecked_transaction
(hash, public_key, height, previous, `timestamp`, serialized, received_at)
VALUES (?, ?, ?, ?, ?, ?, ?)
ON DUPLICATE KEY UPDATE hash = hash
""".trimIndent()

@Transactional(isolation = Isolation.READ_COMMITTED)
suspend fun insert(uncheckedTransactions: Collection<UncheckedTransaction>): Long {
if (uncheckedTransactions.isEmpty()) return 0

return databaseClient
.inConnection { conn ->
val last = uncheckedTransactions.size - 1

val statement = conn.createStatement(sql)
uncheckedTransactions.forEachIndexed { i, t ->
statement.apply {
bind(0, t.hash.value)
bind(1, t.publicKey.value)
bind(2, t.height.value.toBigInteger())
if (t.previous != null) {
bind(3, t.previous.value)
} else {
bindNull(3, ByteArray::class.java)
}
bind(4, t.block.timestamp.toJavaInstant())
bind(
5,
t
.toTransaction()
.toAttoTransaction()
.toBuffer()
.readByteArray(),
)
bind(6, t.receivedAt)

if (i != last) {
add()
}
}
}
Flux
.from(statement.execute())
.flatMap { it.rowsUpdated }
.reduce(0L, Long::plus)
}.awaitSingle()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
import org.springframework.scheduling.annotation.Scheduled
import org.springframework.stereotype.Component
import org.springframework.transaction.annotation.Isolation
import org.springframework.transaction.annotation.Transactional
import java.util.concurrent.TimeUnit

Expand All @@ -32,7 +33,7 @@ class UncheckedTransactionProcessor(
) {
private val mutex = Mutex()

@Transactional
@Transactional(isolation = Isolation.READ_COMMITTED)
suspend fun process(candidateTransactions: Collection<Transaction>): Int {
if (candidateTransactions.isEmpty()) {
return 0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,13 @@ interface UncheckedTransactionRepository :
AttoRepository {
@Query(
"""
SELECT ut.* FROM unchecked_transaction ut
JOIN account a ON a.public_key = ut.public_key AND ut.height > a.height
ORDER BY timestamp
LIMIT :limit
SELECT ut.*
FROM unchecked_transaction AS ut
LEFT JOIN account AS a
ON a.public_key = ut.public_key
WHERE ut.height > COALESCE(a.height, 0)
ORDER BY ut.timestamp
LIMIT :limit;
""",
)
suspend fun findTopOldest(limit: Long): Flow<UncheckedTransaction>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,42 +4,34 @@ import cash.atto.node.EventPublisher
import cash.atto.node.bootstrap.UncheckedTransactionSaved
import cash.atto.node.executeAfterCommit
import io.github.oshai.kotlinlogging.KotlinLogging
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.flow.toList
import kotlinx.coroutines.flow.toSet
import org.springframework.stereotype.Service
import org.springframework.transaction.annotation.Isolation
import org.springframework.transaction.annotation.Transactional

@Service
class UncheckedTransactionService(
private val uncheckedTransactionRepository: UncheckedTransactionRepository,
private val uncheckedTransactionInserter: UncheckedTransactionInserter,
private val eventPublisher: EventPublisher,
) {
private val logger = KotlinLogging.logger {}

@Transactional(isolation = Isolation.SERIALIZABLE)
@Transactional(isolation = Isolation.READ_COMMITTED)
suspend fun save(uncheckedTransactions: Collection<UncheckedTransaction>) {
val existingTransactions =
uncheckedTransactionRepository
.findAllById(uncheckedTransactions.map { it.id })
.map { it.hash }
.toSet()

val transactionsToSave = uncheckedTransactions.filter { it.hash !in existingTransactions }

val savedTransactions = uncheckedTransactionRepository.saveAll(transactionsToSave).toList()
uncheckedTransactionInserter.insert(uncheckedTransactions)
executeAfterCommit {
savedTransactions.forEach {
uncheckedTransactions.forEach {
logger.debug { "Saved $it" }
eventPublisher.publish(UncheckedTransactionSaved(it))
}
}
}

@Transactional
@Transactional(isolation = Isolation.READ_COMMITTED)
suspend fun cleanUp() {
val deletedCount = uncheckedTransactionRepository.deleteExistingInTransaction()
logger.debug { "Deleted $deletedCount unchecked transactions" }
if (deletedCount > 0) {
logger.debug { "Deleted $deletedCount unchecked transactions" }
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import cash.atto.node.CacheSupport
import cash.atto.node.network.BroadcastNetworkMessage
import cash.atto.node.network.BroadcastStrategy
import cash.atto.node.network.InboundNetworkMessage
import cash.atto.node.network.MessageSource
import cash.atto.node.network.NetworkMessagePublisher
import cash.atto.protocol.AttoTransactionPush
import io.github.oshai.kotlinlogging.KotlinLogging
Expand Down Expand Up @@ -37,7 +38,7 @@ class TransactionRebroadcaster(
fun process(message: InboundNetworkMessage<AttoTransactionPush>) {
val transaction = message.payload.transaction

val new = broadcastQueue.seen(transaction, message.publicUri)
val new = broadcastQueue.seen(message.source, transaction, message.publicUri)

if (new) {
logger.trace { "Started monitoring transaction to rebroadcast. $transaction" }
Expand All @@ -46,8 +47,16 @@ class TransactionRebroadcaster(

@EventListener
suspend fun process(event: TransactionValidated) {
if (broadcastQueue.enqueue(event.transaction.hash)) {
logger.trace { "Transaction queued for rebroadcast. ${event.transaction}" }
val hash = event.transaction.hash
broadcastQueue.get(hash)?.let {
if (it.source == MessageSource.REST) {
logger.trace { "Rebroadcasting transaction from rest: ${event.transaction}" }
broadcastQueue.drop(hash)
broadcast(it.transaction)
}
}
if (broadcastQueue.enqueue(hash)) {
logger.trace { "Transaction queued for rebroadcast: ${event.transaction}" }
}
}

Expand Down Expand Up @@ -75,7 +84,7 @@ class TransactionRebroadcaster(

private fun broadcast(
transaction: AttoTransaction,
exceptions: Set<URI>,
exceptions: Set<URI> = setOf(),
) {
val transactionPush = AttoTransactionPush(transaction)
val message =
Expand All @@ -94,6 +103,7 @@ class TransactionRebroadcaster(
}

private class TransactionSocketAddressHolder(
val source: MessageSource,
val transaction: AttoTransaction,
) {
val publicUris = HashSet<URI>()
Expand All @@ -108,18 +118,21 @@ private class BroadcastQueue {
private val transactionQueue = Channel<TransactionSocketAddressHolder>(capacity = UNLIMITED)

fun seen(
source: MessageSource,
transaction: AttoTransaction,
publicUri: URI,
): Boolean {
val holder =
holderMap.compute(transaction.hash) { _, v ->
val holder = v ?: TransactionSocketAddressHolder(transaction)
val holder = v ?: TransactionSocketAddressHolder(source, transaction)
holder.add(publicUri)
holder
}
return holder!!.publicUris.size == 1
}

fun get(hash: AttoHash): TransactionSocketAddressHolder? = holderMap[hash]

fun drop(hash: AttoHash) {
holderMap.remove(hash)
}
Expand Down