Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@ import cash.atto.node.network.BroadcastNetworkMessage
import cash.atto.node.network.BroadcastStrategy
import cash.atto.node.network.NetworkMessagePublisher
import cash.atto.node.transaction.TransactionSource
import cash.atto.node.vote.VoteService
import cash.atto.protocol.AttoNode
import cash.atto.protocol.AttoTransactionPush
import io.github.oshai.kotlinlogging.KotlinLogging
import kotlinx.coroutines.ExperimentalCoroutinesApi
Expand All @@ -23,10 +21,8 @@ import java.util.concurrent.TimeUnit

@Service
class ElectionProcessor(
private val thisNode: AttoNode,
private val messagePublisher: NetworkMessagePublisher,
private val accountService: AccountService,
private val voteService: VoteService,
transactionManager: ReactiveTransactionManager,
) {
private val logger = KotlinLogging.logger {}
Expand Down Expand Up @@ -85,11 +81,6 @@ class ElectionProcessor(

transactionalOperator.executeAndAwait {
accountService.add(TransactionSource.ELECTION, transactions)

if (thisNode.isHistorical()) {
val finalVotes = events.flatMap { it.votes }.filter { it.isFinal() }
voteService.saveAll(finalVotes)
}
}
} catch (e: Exception) {
throw RuntimeException("Error while processing ${events.map { it.transaction.hash }}", e)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
package cash.atto.node.election

import cash.atto.commons.AttoPublicKey
import cash.atto.node.vote.Vote
import cash.atto.node.vote.VoteService
import cash.atto.node.vote.weight.WeightService
import cash.atto.protocol.AttoNode
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
import org.springframework.context.event.EventListener
import org.springframework.scheduling.annotation.Scheduled
import org.springframework.stereotype.Service
import java.time.Instant
import java.util.concurrent.TimeUnit

@Service
class ElectionVoteProcessor(
private val thisNode: AttoNode,
private val voteService: VoteService,
private val weightService: WeightService,
) {
private val buffer = Channel<ElectionConsensusReached>(Channel.UNLIMITED)
private val mutex = Mutex()

@EventListener
suspend fun process(event: ElectionConsensusReached) {
buffer.send(event)
}

@OptIn(ExperimentalCoroutinesApi::class)
@Scheduled(fixedRate = 1, timeUnit = TimeUnit.MILLISECONDS)
suspend fun flush() {
if (mutex.isLocked) {
return
}
mutex.withLock {
while (!buffer.isEmpty) {
flushBatch(1_000)
}
}
}

private suspend fun flushBatch(size: Int) {
val finalVotes = mutableListOf<Vote>()
val latestTimestamps = HashMap<AttoPublicKey, Instant>()

while (finalVotes.size < size) {
val event = buffer.tryReceive().getOrNull() ?: break

for (vote in event.votes) {
if (thisNode.isHistorical() && vote.isFinal()) {
finalVotes.add(vote)
}
latestTimestamps.merge(vote.publicKey, vote.receivedAt) { old, new ->
if (new > old) new else old
}
}
}

weightService.updateLastVoteTimestamps(latestTimestamps)

if (thisNode.isHistorical() && finalVotes.isNotEmpty()) {
voteService.saveAll(finalVotes)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ class VoteWeightController(
@PathVariable address: AttoAddress,
): ResponseEntity<AttoVoterWeight> {
val weight = voteWeighter.get(address.publicKey)
val lastVotedAt = voteWeighter.getLastestVote(address.publicKey)?.receivedAt?.toAtto() ?: Instant.EPOCH.toAtto()
val lastVotedAt = voteWeighter.getLatestVoteTimestamp(address.publicKey)?.toAtto() ?: Instant.EPOCH.toAtto()
val voterWeight = AttoVoterWeight(address, weight, lastVotedAt)
return ResponseEntity.ok(voterWeight)
}
Expand Down
73 changes: 25 additions & 48 deletions src/main/kotlin/cash/atto/node/vote/weight/VoteWeighter.kt
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,11 @@ import cash.atto.commons.toAttoAmount
import cash.atto.node.CacheSupport
import cash.atto.node.account.AccountUpdated
import cash.atto.node.transaction.Transaction
import cash.atto.node.vote.Vote
import cash.atto.node.vote.VoteRepository
import cash.atto.node.vote.VoteValidated
import cash.atto.protocol.AttoNode
import io.github.oshai.kotlinlogging.KotlinLogging
import jakarta.annotation.PostConstruct
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.flow.toList
import kotlinx.coroutines.runBlocking
import org.springframework.context.annotation.DependsOn
Expand All @@ -38,53 +35,40 @@ class VoteWeighter(
val thisNode: AttoNode,
val properties: VoteWeightProperties,
val weightService: WeightService,
val voteRepository: VoteRepository,
val genesisTransaction: Transaction,
) : CacheSupport {
private val logger = KotlinLogging.logger {}

private val weightMap = ConcurrentHashMap<AttoPublicKey, AttoAmount>()
private val latestVoteMap = ConcurrentHashMap<AttoPublicKey, Vote>()
private val weightMap = ConcurrentHashMap<AttoPublicKey, Weight>()
private lateinit var onlineWeight: AttoAmount
private lateinit var minimalRebroadcastWeight: AttoAmount
private lateinit var minimalConfirmationWeight: AttoAmount

@PostConstruct
override fun init() =
runBlocking {
val weights =
val allWeights =
weightService
.refresh()
.map { it.representativePublicKey to it.weight }
.toList()
.toMap()

weightMap.putAll(weights)

val minTimestamp = getMinTimestamp()
val voteMap =
voteRepository
.findLatestAfter(minTimestamp)
.asSequence()
.map { it.publicKey to it }
.toMap()
latestVoteMap.putAll(voteMap)

weightMap.putAll(allWeights.associateBy { it.representativePublicKey })

calculateMinimalWeights()
}

override fun clear() {
weightMap.clear()
latestVoteMap.clear()
}

@EventListener
fun listen(event: VoteValidated) {
val vote = event.vote
latestVoteMap.compute(vote.publicKey) { _, previousHashVote ->
if (previousHashVote == null || vote.receivedAt > previousHashVote.receivedAt) {
vote
weightMap.computeIfPresent(vote.publicKey) { _, existing ->
if (existing.lastVoteTimestamp == null || vote.receivedAt > existing.lastVoteTimestamp) {
existing.copy(lastVoteTimestamp = vote.receivedAt)
} else {
previousHashVote
existing
}
}
}
Expand Down Expand Up @@ -123,26 +107,19 @@ class VoteWeighter(
logger.trace { "Weight updated $weightMap" }
}

fun getMap(): LinkedHashMap<AttoPublicKey, AttoAmount> =
weightMap.entries
.asSequence()
.sortedByDescending { it.value }
.associate { it.key to it.value }
.toMap(LinkedHashMap())

fun getLastestVote(publicKey: AttoPublicKey): Vote? = latestVoteMap[publicKey]
fun getLatestVoteTimestamp(publicKey: AttoPublicKey): Instant? = weightMap[publicKey]?.lastVoteTimestamp

private suspend fun add(
publicKey: AttoPublicKey,
amount: AttoAmount,
defaultAmount: AttoAmount,
) {
retryUntilSuccess {
weightMap.compute(publicKey) { _, weight ->
if (weight == null) {
defaultAmount
weightMap.compute(publicKey) { _, existing ->
if (existing == null) {
Weight(representativePublicKey = publicKey, weight = defaultAmount)
} else {
weight + amount
existing.copy(weight = existing.weight + amount)
}
}
}
Expand All @@ -154,25 +131,25 @@ class VoteWeighter(
defaultAmount: AttoAmount,
) {
retryUntilSuccess {
weightMap.compute(publicKey) { _, weight ->
weightMap.compute(publicKey) { _, existing ->
val newWeight =
if (weight == null) {
if (existing == null) {
defaultAmount
} else {
weight - amount
existing.weight - amount
}
if (newWeight > AttoAmount.MIN) {
return@compute newWeight
return@compute (existing ?: Weight(representativePublicKey = publicKey, weight = newWeight)).copy(weight = newWeight)
} else {
return@compute null
}
}
}
}

fun getAll(): Map<AttoPublicKey, AttoAmount> = weightMap.toMap()
fun getAll(): Map<AttoPublicKey, AttoAmount> = weightMap.mapValues { it.value.weight }

fun get(publicKey: AttoPublicKey): AttoAmount = weightMap[publicKey] ?: AttoAmount.MIN
fun get(publicKey: AttoPublicKey): AttoAmount = weightMap[publicKey]?.weight ?: AttoAmount.MIN

fun get(): AttoAmount = get(thisNode.publicKey)

Expand All @@ -199,11 +176,11 @@ class VoteWeighter(
val onlineWeights =
weightMap
.asSequence()
.filter { minTimestamp < (latestVoteMap[it.key]?.receivedAt ?: Instant.MIN) }
.sortedByDescending { it.value.raw }
.filter { minTimestamp < (it.value.lastVoteTimestamp ?: Instant.MIN) }
.sortedByDescending { it.value.weight.raw }
.toList()

val onlineWeight = onlineWeights.sumOf { it.value.raw }
val onlineWeight = onlineWeights.sumOf { it.value.weight.raw }

this.onlineWeight = onlineWeight.toAttoAmount()

Expand All @@ -223,9 +200,9 @@ class VoteWeighter(
logger.info { "Minimal confirmation weight updated to ${this.minimalConfirmationWeight}" }

if (onlineWeights.size >= 10) {
this.minimalRebroadcastWeight = onlineWeights[9].value
this.minimalRebroadcastWeight = onlineWeights[9].value.weight
} else if (onlineWeights.isNotEmpty()) {
this.minimalRebroadcastWeight = onlineWeights.last().value
this.minimalRebroadcastWeight = onlineWeights.last().value.weight
} else {
this.minimalRebroadcastWeight = properties.minimalRebroadcastWeight!!.replace("_", "").toAttoAmount()
}
Expand Down
2 changes: 2 additions & 0 deletions src/main/kotlin/cash/atto/node/vote/weight/Weight.kt
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,11 @@ package cash.atto.node.vote.weight
import cash.atto.commons.AttoAmount
import cash.atto.commons.AttoPublicKey
import org.springframework.data.annotation.Id
import java.time.Instant

data class Weight(
@Id
val representativePublicKey: AttoPublicKey,
val weight: AttoAmount,
val lastVoteTimestamp: Instant? = null,
)
36 changes: 31 additions & 5 deletions src/main/kotlin/cash/atto/node/vote/weight/WeightRepository.kt
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,43 @@ import cash.atto.commons.AttoPublicKey
import org.springframework.data.r2dbc.repository.Modifying
import org.springframework.data.r2dbc.repository.Query
import org.springframework.data.repository.kotlin.CoroutineCrudRepository
import java.time.Instant

interface WeightRepository : CoroutineCrudRepository<Weight, AttoPublicKey> {
@Modifying
@Query(
"""
INSERT INTO weight (representative_algorithm, representative_public_key, weight)
SELECT representative_algorithm, representative_public_key, CAST(SUM(balance) AS UNSIGNED) AS weight
FROM account
GROUP BY representative_algorithm, representative_public_key
HAVING weight > 0
SELECT a.representative_algorithm, a.representative_public_key, CAST(SUM(a.balance) AS UNSIGNED) AS weight
FROM account a
LEFT JOIN weight w ON a.representative_public_key = w.representative_public_key
GROUP BY a.representative_algorithm, a.representative_public_key
ON DUPLICATE KEY UPDATE weight = VALUES(weight)
""",
)
suspend fun refreshWeights()
suspend fun upsertWeights()

@Modifying
@Query(
"""
DELETE w FROM weight w
LEFT JOIN account a ON w.representative_public_key = a.representative_public_key
WHERE a.representative_public_key IS NULL OR w.weight = 0
""",
)
suspend fun deleteStale()

@Modifying
@Query(
"""
UPDATE weight
SET last_vote_timestamp = :timestamp
WHERE representative_public_key = :publicKey
AND (last_vote_timestamp IS NULL OR last_vote_timestamp < :timestamp)
""",
)
suspend fun updateLastVoteTimestamp(
publicKey: AttoPublicKey,
timestamp: Instant,
)
}
16 changes: 11 additions & 5 deletions src/main/kotlin/cash/atto/node/vote/weight/WeightService.kt
Original file line number Diff line number Diff line change
@@ -1,20 +1,26 @@
package cash.atto.node.vote.weight

import cash.atto.commons.AttoPublicKey
import kotlinx.coroutines.flow.Flow
import org.springframework.scheduling.annotation.Scheduled
import org.springframework.stereotype.Service
import org.springframework.transaction.annotation.Transactional
import java.util.concurrent.TimeUnit
import java.time.Instant

@Service
class WeightService(
private val weightRepository: WeightRepository,
) {
@Transactional
@Scheduled(fixedRate = 1, timeUnit = TimeUnit.HOURS)
suspend fun refresh(): Flow<Weight> {
weightRepository.deleteAll()
weightRepository.refreshWeights()
weightRepository.upsertWeights()
weightRepository.deleteStale()
return weightRepository.findAll()
}

@Transactional
suspend fun updateLastVoteTimestamps(timestamps: Map<AttoPublicKey, Instant>) {
for ((publicKey, timestamp) in timestamps) {
weightRepository.updateLastVoteTimestamp(publicKey, timestamp)
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
ALTER TABLE weight ADD PRIMARY KEY (representative_public_key);

ALTER TABLE weight ADD COLUMN last_vote_timestamp TIMESTAMP(6) NULL;
Loading