diff --git a/src/main/kotlin/cash/atto/node/election/ElectionProcessor.kt b/src/main/kotlin/cash/atto/node/election/ElectionProcessor.kt index 7048f86b..0a386eb0 100644 --- a/src/main/kotlin/cash/atto/node/election/ElectionProcessor.kt +++ b/src/main/kotlin/cash/atto/node/election/ElectionProcessor.kt @@ -5,8 +5,6 @@ import cash.atto.node.network.BroadcastNetworkMessage import cash.atto.node.network.BroadcastStrategy import cash.atto.node.network.NetworkMessagePublisher import cash.atto.node.transaction.TransactionSource -import cash.atto.node.vote.VoteService -import cash.atto.protocol.AttoNode import cash.atto.protocol.AttoTransactionPush import io.github.oshai.kotlinlogging.KotlinLogging import kotlinx.coroutines.ExperimentalCoroutinesApi @@ -23,10 +21,8 @@ import java.util.concurrent.TimeUnit @Service class ElectionProcessor( - private val thisNode: AttoNode, private val messagePublisher: NetworkMessagePublisher, private val accountService: AccountService, - private val voteService: VoteService, transactionManager: ReactiveTransactionManager, ) { private val logger = KotlinLogging.logger {} @@ -85,11 +81,6 @@ class ElectionProcessor( transactionalOperator.executeAndAwait { accountService.add(TransactionSource.ELECTION, transactions) - - if (thisNode.isHistorical()) { - val finalVotes = events.flatMap { it.votes }.filter { it.isFinal() } - voteService.saveAll(finalVotes) - } } } catch (e: Exception) { throw RuntimeException("Error while processing ${events.map { it.transaction.hash }}", e) diff --git a/src/main/kotlin/cash/atto/node/election/ElectionVoteProcessor.kt b/src/main/kotlin/cash/atto/node/election/ElectionVoteProcessor.kt new file mode 100644 index 00000000..0c796588 --- /dev/null +++ b/src/main/kotlin/cash/atto/node/election/ElectionVoteProcessor.kt @@ -0,0 +1,68 @@ +package cash.atto.node.election + +import cash.atto.commons.AttoPublicKey +import cash.atto.node.vote.Vote +import cash.atto.node.vote.VoteService +import cash.atto.node.vote.weight.WeightService +import cash.atto.protocol.AttoNode +import kotlinx.coroutines.ExperimentalCoroutinesApi +import kotlinx.coroutines.channels.Channel +import kotlinx.coroutines.sync.Mutex +import kotlinx.coroutines.sync.withLock +import org.springframework.context.event.EventListener +import org.springframework.scheduling.annotation.Scheduled +import org.springframework.stereotype.Service +import java.time.Instant +import java.util.concurrent.TimeUnit + +@Service +class ElectionVoteProcessor( + private val thisNode: AttoNode, + private val voteService: VoteService, + private val weightService: WeightService, +) { + private val buffer = Channel(Channel.UNLIMITED) + private val mutex = Mutex() + + @EventListener + suspend fun process(event: ElectionConsensusReached) { + buffer.send(event) + } + + @OptIn(ExperimentalCoroutinesApi::class) + @Scheduled(fixedRate = 1, timeUnit = TimeUnit.MILLISECONDS) + suspend fun flush() { + if (mutex.isLocked) { + return + } + mutex.withLock { + while (!buffer.isEmpty) { + flushBatch(1_000) + } + } + } + + private suspend fun flushBatch(size: Int) { + val finalVotes = mutableListOf() + val latestTimestamps = HashMap() + + while (finalVotes.size < size) { + val event = buffer.tryReceive().getOrNull() ?: break + + for (vote in event.votes) { + if (thisNode.isHistorical() && vote.isFinal()) { + finalVotes.add(vote) + } + latestTimestamps.merge(vote.publicKey, vote.receivedAt) { old, new -> + if (new > old) new else old + } + } + } + + weightService.updateLastVoteTimestamps(latestTimestamps) + + if (thisNode.isHistorical() && finalVotes.isNotEmpty()) { + voteService.saveAll(finalVotes) + } + } +} diff --git a/src/main/kotlin/cash/atto/node/vote/weight/VoteWeightController.kt b/src/main/kotlin/cash/atto/node/vote/weight/VoteWeightController.kt index bf89a890..ce015e42 100644 --- a/src/main/kotlin/cash/atto/node/vote/weight/VoteWeightController.kt +++ b/src/main/kotlin/cash/atto/node/vote/weight/VoteWeightController.kt @@ -39,7 +39,7 @@ class VoteWeightController( @PathVariable address: AttoAddress, ): ResponseEntity { val weight = voteWeighter.get(address.publicKey) - val lastVotedAt = voteWeighter.getLastestVote(address.publicKey)?.receivedAt?.toAtto() ?: Instant.EPOCH.toAtto() + val lastVotedAt = voteWeighter.getLatestVoteTimestamp(address.publicKey)?.toAtto() ?: Instant.EPOCH.toAtto() val voterWeight = AttoVoterWeight(address, weight, lastVotedAt) return ResponseEntity.ok(voterWeight) } diff --git a/src/main/kotlin/cash/atto/node/vote/weight/VoteWeighter.kt b/src/main/kotlin/cash/atto/node/vote/weight/VoteWeighter.kt index 8e6be3bd..20d1d1be 100644 --- a/src/main/kotlin/cash/atto/node/vote/weight/VoteWeighter.kt +++ b/src/main/kotlin/cash/atto/node/vote/weight/VoteWeighter.kt @@ -10,14 +10,11 @@ import cash.atto.commons.toAttoAmount import cash.atto.node.CacheSupport import cash.atto.node.account.AccountUpdated import cash.atto.node.transaction.Transaction -import cash.atto.node.vote.Vote -import cash.atto.node.vote.VoteRepository import cash.atto.node.vote.VoteValidated import cash.atto.protocol.AttoNode import io.github.oshai.kotlinlogging.KotlinLogging import jakarta.annotation.PostConstruct import kotlinx.coroutines.delay -import kotlinx.coroutines.flow.map import kotlinx.coroutines.flow.toList import kotlinx.coroutines.runBlocking import org.springframework.context.annotation.DependsOn @@ -38,13 +35,11 @@ class VoteWeighter( val thisNode: AttoNode, val properties: VoteWeightProperties, val weightService: WeightService, - val voteRepository: VoteRepository, val genesisTransaction: Transaction, ) : CacheSupport { private val logger = KotlinLogging.logger {} - private val weightMap = ConcurrentHashMap() - private val latestVoteMap = ConcurrentHashMap() + private val weightMap = ConcurrentHashMap() private lateinit var onlineWeight: AttoAmount private lateinit var minimalRebroadcastWeight: AttoAmount private lateinit var minimalConfirmationWeight: AttoAmount @@ -52,39 +47,28 @@ class VoteWeighter( @PostConstruct override fun init() = runBlocking { - val weights = + val allWeights = weightService .refresh() - .map { it.representativePublicKey to it.weight } .toList() - .toMap() - - weightMap.putAll(weights) - - val minTimestamp = getMinTimestamp() - val voteMap = - voteRepository - .findLatestAfter(minTimestamp) - .asSequence() - .map { it.publicKey to it } - .toMap() - latestVoteMap.putAll(voteMap) + + weightMap.putAll(allWeights.associateBy { it.representativePublicKey }) + calculateMinimalWeights() } override fun clear() { weightMap.clear() - latestVoteMap.clear() } @EventListener fun listen(event: VoteValidated) { val vote = event.vote - latestVoteMap.compute(vote.publicKey) { _, previousHashVote -> - if (previousHashVote == null || vote.receivedAt > previousHashVote.receivedAt) { - vote + weightMap.computeIfPresent(vote.publicKey) { _, existing -> + if (existing.lastVoteTimestamp == null || vote.receivedAt > existing.lastVoteTimestamp) { + existing.copy(lastVoteTimestamp = vote.receivedAt) } else { - previousHashVote + existing } } } @@ -123,14 +107,7 @@ class VoteWeighter( logger.trace { "Weight updated $weightMap" } } - fun getMap(): LinkedHashMap = - weightMap.entries - .asSequence() - .sortedByDescending { it.value } - .associate { it.key to it.value } - .toMap(LinkedHashMap()) - - fun getLastestVote(publicKey: AttoPublicKey): Vote? = latestVoteMap[publicKey] + fun getLatestVoteTimestamp(publicKey: AttoPublicKey): Instant? = weightMap[publicKey]?.lastVoteTimestamp private suspend fun add( publicKey: AttoPublicKey, @@ -138,11 +115,11 @@ class VoteWeighter( defaultAmount: AttoAmount, ) { retryUntilSuccess { - weightMap.compute(publicKey) { _, weight -> - if (weight == null) { - defaultAmount + weightMap.compute(publicKey) { _, existing -> + if (existing == null) { + Weight(representativePublicKey = publicKey, weight = defaultAmount) } else { - weight + amount + existing.copy(weight = existing.weight + amount) } } } @@ -154,15 +131,15 @@ class VoteWeighter( defaultAmount: AttoAmount, ) { retryUntilSuccess { - weightMap.compute(publicKey) { _, weight -> + weightMap.compute(publicKey) { _, existing -> val newWeight = - if (weight == null) { + if (existing == null) { defaultAmount } else { - weight - amount + existing.weight - amount } if (newWeight > AttoAmount.MIN) { - return@compute newWeight + return@compute (existing ?: Weight(representativePublicKey = publicKey, weight = newWeight)).copy(weight = newWeight) } else { return@compute null } @@ -170,9 +147,9 @@ class VoteWeighter( } } - fun getAll(): Map = weightMap.toMap() + fun getAll(): Map = weightMap.mapValues { it.value.weight } - fun get(publicKey: AttoPublicKey): AttoAmount = weightMap[publicKey] ?: AttoAmount.MIN + fun get(publicKey: AttoPublicKey): AttoAmount = weightMap[publicKey]?.weight ?: AttoAmount.MIN fun get(): AttoAmount = get(thisNode.publicKey) @@ -199,11 +176,11 @@ class VoteWeighter( val onlineWeights = weightMap .asSequence() - .filter { minTimestamp < (latestVoteMap[it.key]?.receivedAt ?: Instant.MIN) } - .sortedByDescending { it.value.raw } + .filter { minTimestamp < (it.value.lastVoteTimestamp ?: Instant.MIN) } + .sortedByDescending { it.value.weight.raw } .toList() - val onlineWeight = onlineWeights.sumOf { it.value.raw } + val onlineWeight = onlineWeights.sumOf { it.value.weight.raw } this.onlineWeight = onlineWeight.toAttoAmount() @@ -223,9 +200,9 @@ class VoteWeighter( logger.info { "Minimal confirmation weight updated to ${this.minimalConfirmationWeight}" } if (onlineWeights.size >= 10) { - this.minimalRebroadcastWeight = onlineWeights[9].value + this.minimalRebroadcastWeight = onlineWeights[9].value.weight } else if (onlineWeights.isNotEmpty()) { - this.minimalRebroadcastWeight = onlineWeights.last().value + this.minimalRebroadcastWeight = onlineWeights.last().value.weight } else { this.minimalRebroadcastWeight = properties.minimalRebroadcastWeight!!.replace("_", "").toAttoAmount() } diff --git a/src/main/kotlin/cash/atto/node/vote/weight/Weight.kt b/src/main/kotlin/cash/atto/node/vote/weight/Weight.kt index 6a375f8f..6d4feb96 100644 --- a/src/main/kotlin/cash/atto/node/vote/weight/Weight.kt +++ b/src/main/kotlin/cash/atto/node/vote/weight/Weight.kt @@ -3,9 +3,11 @@ package cash.atto.node.vote.weight import cash.atto.commons.AttoAmount import cash.atto.commons.AttoPublicKey import org.springframework.data.annotation.Id +import java.time.Instant data class Weight( @Id val representativePublicKey: AttoPublicKey, val weight: AttoAmount, + val lastVoteTimestamp: Instant? = null, ) diff --git a/src/main/kotlin/cash/atto/node/vote/weight/WeightRepository.kt b/src/main/kotlin/cash/atto/node/vote/weight/WeightRepository.kt index 94a47993..273018e4 100644 --- a/src/main/kotlin/cash/atto/node/vote/weight/WeightRepository.kt +++ b/src/main/kotlin/cash/atto/node/vote/weight/WeightRepository.kt @@ -4,17 +4,43 @@ import cash.atto.commons.AttoPublicKey import org.springframework.data.r2dbc.repository.Modifying import org.springframework.data.r2dbc.repository.Query import org.springframework.data.repository.kotlin.CoroutineCrudRepository +import java.time.Instant interface WeightRepository : CoroutineCrudRepository { @Modifying @Query( """ INSERT INTO weight (representative_algorithm, representative_public_key, weight) - SELECT representative_algorithm, representative_public_key, CAST(SUM(balance) AS UNSIGNED) AS weight - FROM account - GROUP BY representative_algorithm, representative_public_key - HAVING weight > 0 + SELECT a.representative_algorithm, a.representative_public_key, CAST(SUM(a.balance) AS UNSIGNED) AS weight + FROM account a + LEFT JOIN weight w ON a.representative_public_key = w.representative_public_key + GROUP BY a.representative_algorithm, a.representative_public_key + ON DUPLICATE KEY UPDATE weight = VALUES(weight) """, ) - suspend fun refreshWeights() + suspend fun upsertWeights() + + @Modifying + @Query( + """ + DELETE w FROM weight w + LEFT JOIN account a ON w.representative_public_key = a.representative_public_key + WHERE a.representative_public_key IS NULL OR w.weight = 0 + """, + ) + suspend fun deleteStale() + + @Modifying + @Query( + """ + UPDATE weight + SET last_vote_timestamp = :timestamp + WHERE representative_public_key = :publicKey + AND (last_vote_timestamp IS NULL OR last_vote_timestamp < :timestamp) + """, + ) + suspend fun updateLastVoteTimestamp( + publicKey: AttoPublicKey, + timestamp: Instant, + ) } diff --git a/src/main/kotlin/cash/atto/node/vote/weight/WeightService.kt b/src/main/kotlin/cash/atto/node/vote/weight/WeightService.kt index 0968b8a3..6d95488b 100644 --- a/src/main/kotlin/cash/atto/node/vote/weight/WeightService.kt +++ b/src/main/kotlin/cash/atto/node/vote/weight/WeightService.kt @@ -1,20 +1,26 @@ package cash.atto.node.vote.weight +import cash.atto.commons.AttoPublicKey import kotlinx.coroutines.flow.Flow -import org.springframework.scheduling.annotation.Scheduled import org.springframework.stereotype.Service import org.springframework.transaction.annotation.Transactional -import java.util.concurrent.TimeUnit +import java.time.Instant @Service class WeightService( private val weightRepository: WeightRepository, ) { @Transactional - @Scheduled(fixedRate = 1, timeUnit = TimeUnit.HOURS) suspend fun refresh(): Flow { - weightRepository.deleteAll() - weightRepository.refreshWeights() + weightRepository.upsertWeights() + weightRepository.deleteStale() return weightRepository.findAll() } + + @Transactional + suspend fun updateLastVoteTimestamps(timestamps: Map) { + for ((publicKey, timestamp) in timestamps) { + weightRepository.updateLastVoteTimestamp(publicKey, timestamp) + } + } } diff --git a/src/main/resources/db/migration/V9__add_weight_last_vote_timestamp.sql b/src/main/resources/db/migration/V9__add_weight_last_vote_timestamp.sql new file mode 100644 index 00000000..f22ede12 --- /dev/null +++ b/src/main/resources/db/migration/V9__add_weight_last_vote_timestamp.sql @@ -0,0 +1,3 @@ +ALTER TABLE weight ADD PRIMARY KEY (representative_public_key); + +ALTER TABLE weight ADD COLUMN last_vote_timestamp TIMESTAMP(6) NULL;