From ce19a7b2d44a9ed3e1a2d78c373470ddf58f6290 Mon Sep 17 00:00:00 2001 From: rotilho Date: Wed, 11 Mar 2026 18:42:25 +0100 Subject: [PATCH 1/7] Do not vote when consensus is reached --- .../cash/atto/node/election/ElectionVoter.kt | 1 - .../atto/node/election/ElectionVoterTest.kt | 38 +++++++++++++++++-- 2 files changed, 35 insertions(+), 4 deletions(-) diff --git a/src/main/kotlin/cash/atto/node/election/ElectionVoter.kt b/src/main/kotlin/cash/atto/node/election/ElectionVoter.kt index 34e94eeb..94d08a85 100644 --- a/src/main/kotlin/cash/atto/node/election/ElectionVoter.kt +++ b/src/main/kotlin/cash/atto/node/election/ElectionVoter.kt @@ -123,7 +123,6 @@ class ElectionVoter( private suspend fun consensusReached(transaction: Transaction) { remove(transaction) - vote(transaction, Instant.now()) } @EventListener diff --git a/src/test/kotlin/cash/atto/node/election/ElectionVoterTest.kt b/src/test/kotlin/cash/atto/node/election/ElectionVoterTest.kt index 4081ced0..a3f6edae 100644 --- a/src/test/kotlin/cash/atto/node/election/ElectionVoterTest.kt +++ b/src/test/kotlin/cash/atto/node/election/ElectionVoterTest.kt @@ -219,7 +219,7 @@ class ElectionVoterTest { } @Test - fun `should send final vote when consensus reached`() { + fun `should NOT send vote when consensus reached`() { // given val transaction = Transaction.sample() @@ -229,7 +229,7 @@ class ElectionVoterTest { } // then - verify(exactly = 1) { + verify(exactly = 0) { messagePublisher.publish( match { message -> message as BroadcastNetworkMessage @@ -237,7 +237,7 @@ class ElectionVoterTest { }, ) } - verify(exactly = 1) { + verify(exactly = 0) { eventPublisher.publish( match { event -> event as VoteValidated @@ -247,6 +247,38 @@ class ElectionVoterTest { } } + @Test + fun `should send only vote and final vote in election flow`() { + // given + val transaction = Transaction.sample() + val accountUpdated = AccountUpdated(TransactionSource.ELECTION, account, account, transaction) + + // when + runBlocking { + electionVoter.process(ElectionStarted(account, transaction)) + electionVoter.process(ElectionConsensusReached(account, transaction, emptySet())) + electionVoter.process(accountUpdated) + } + + // then + verify(exactly = 1) { + messagePublisher.publish( + match { message -> + message as BroadcastNetworkMessage + message.strategy == BroadcastStrategy.VOTERS + }, + ) + } + verify(exactly = 1) { + messagePublisher.publish( + match { message -> + message as BroadcastNetworkMessage + message.strategy == BroadcastStrategy.EVERYONE + }, + ) + } + } + @Test fun `should send final vote when account is updated`() { // given From 4e6459e59f30881f873b433b2133493eecd028f9 Mon Sep 17 00:00:00 2001 From: rotilho Date: Thu, 12 Mar 2026 10:49:29 +0100 Subject: [PATCH 2/7] Benchmark ElectionVoter --- build.gradle.kts | 24 ++ .../node/election/ElectionVoterBenchmark.kt | 259 ++++++++++++++++++ 2 files changed, 283 insertions(+) create mode 100644 src/benchmark/kotlin/cash/atto/node/election/ElectionVoterBenchmark.kt diff --git a/build.gradle.kts b/build.gradle.kts index 75cb282e..f268fb43 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -1,10 +1,14 @@ +import kotlinx.benchmark.gradle.JvmBenchmarkTarget + plugins { val kotlinVersion = "2.3.0" kotlin("jvm") version kotlinVersion kotlin("plugin.serialization") version kotlinVersion kotlin("plugin.spring") version kotlinVersion + kotlin("plugin.allopen") version kotlinVersion + id("org.jetbrains.kotlinx.benchmark") version "0.4.14" id("org.springframework.boot") version "3.5.11" id("org.graalvm.buildtools.native") version "0.11.4" id("org.jlleitschuh.gradle.ktlint") version "14.0.1" @@ -18,10 +22,18 @@ java { } } +sourceSets { + create("benchmark") +} + kotlin { compilerOptions { freeCompilerArgs.addAll("-Xjsr305=strict") } + + target.compilations + .getByName("benchmark") + .associateWith(target.compilations.getByName("main")) } repositories { @@ -41,6 +53,10 @@ configurations { } } +allOpen { + annotation("org.openjdk.jmh.annotations.State") +} + ext["kotlin-coroutines.version"] = "1.9.0" ext["kotlin-serialization.version"] = "1.8.0" @@ -120,6 +136,8 @@ dependencies { testImplementation("org.testcontainers:junit-jupiter") testImplementation("org.testcontainers:mysql") testImplementation("org.testcontainers:r2dbc") + add("benchmarkImplementation", "org.jetbrains.kotlinx:kotlinx-benchmark-runtime:0.4.14") + } tasks.withType { @@ -138,3 +156,9 @@ graalvmNative { } } } + +benchmark { + targets { + register("benchmark") + } +} diff --git a/src/benchmark/kotlin/cash/atto/node/election/ElectionVoterBenchmark.kt b/src/benchmark/kotlin/cash/atto/node/election/ElectionVoterBenchmark.kt new file mode 100644 index 00000000..ab0b37fb --- /dev/null +++ b/src/benchmark/kotlin/cash/atto/node/election/ElectionVoterBenchmark.kt @@ -0,0 +1,259 @@ +package cash.atto.node.election + +import cash.atto.commons.AttoAddress +import cash.atto.commons.AttoAlgorithm +import cash.atto.commons.AttoAmount +import cash.atto.commons.AttoHash +import cash.atto.commons.AttoInstant +import cash.atto.commons.AttoNetwork +import cash.atto.commons.AttoPublicKey +import cash.atto.commons.AttoReceiveBlock +import cash.atto.commons.AttoSignature +import cash.atto.commons.AttoSigner +import cash.atto.commons.AttoWork +import cash.atto.commons.toAttoHeight +import cash.atto.commons.toAttoVersion +import cash.atto.node.EventPublisher +import cash.atto.node.account.Account +import cash.atto.node.account.AccountRepository +import cash.atto.node.account.AccountUpdated +import cash.atto.node.network.NetworkMessagePublisher +import cash.atto.node.transaction.Transaction +import cash.atto.node.transaction.TransactionSource +import cash.atto.node.vote.VoteRepository +import cash.atto.node.vote.weight.VoteWeightProperties +import cash.atto.node.vote.weight.VoteWeighter +import cash.atto.node.vote.weight.WeightRepository +import cash.atto.node.vote.weight.WeightService +import cash.atto.protocol.AttoNode +import cash.atto.protocol.NodeFeature +import kotlinx.benchmark.Benchmark +import kotlinx.benchmark.Scope +import kotlinx.benchmark.Setup +import kotlinx.benchmark.State +import kotlinx.benchmark.TearDown +import kotlinx.coroutines.delay +import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.emptyFlow +import kotlinx.coroutines.runBlocking +import org.openjdk.jmh.annotations.Level +import org.openjdk.jmh.annotations.Threads +import org.springframework.context.ApplicationEvent +import org.springframework.context.ApplicationEventPublisher +import java.lang.reflect.Proxy +import java.net.URI +import java.time.Instant +import java.util.concurrent.ConcurrentHashMap +import java.util.concurrent.atomic.AtomicInteger +import kotlin.time.Duration.Companion.milliseconds + +@State(Scope.Benchmark) +class ElectionVoterFlowBenchmark { + private companion object { + val EVENT_TIMESTAMP: Instant = Instant.EPOCH + } + + private lateinit var electionVoter: ElectionVoter + private lateinit var account: Account + + private lateinit var node: AttoNode + private lateinit var signer: AttoSigner + private lateinit var voteWeighter: VoteWeighter + private val accountRepository: AccountRepository = NoOpAccountRepository() + private val nextHeight = AtomicInteger(1) + + @Setup(Level.Trial) + fun setUp() { + val algorithm = AttoAlgorithm.V1 + val publicKey = AttoPublicKey(ByteArray(32) { index -> index.toByte() }) + + node = + AttoNode( + network = AttoNetwork.LOCAL, + protocolVersion = 1U, + algorithm = algorithm, + publicKey = publicKey, + publicUri = URI("ws://127.0.0.1:7070"), + features = setOf(NodeFeature.VOTING), + ) + signer = BenchmarkSigner(algorithm, publicKey) + voteWeighter = createVoteWeighter(node) + + val springPublisher = NoOpSpringPublisher() + electionVoter = + ElectionVoter( + thisNode = node, + signer = signer, + voteWeighter = voteWeighter, + eventPublisher = EventPublisher(springPublisher), + messagePublisher = NetworkMessagePublisher(springPublisher), + accountRepository = accountRepository, + ) + account = createAccount(node) + } + + @Benchmark + @Threads(10) + fun processElectionFlow() = + runBlocking { + val transaction = createTransaction(node, nextHeight.getAndIncrement().toUInt()) + val accountUpdated = + AccountUpdated( + source = TransactionSource.ELECTION, + previousAccount = account, + updatedAccount = account, + transaction = transaction, + timestamp = EVENT_TIMESTAMP, + ) + + electionVoter.process(ElectionStarted(account, transaction, EVENT_TIMESTAMP)) + electionVoter.process( + ElectionConsensusChanged( + account = account, + transaction = transaction, + timestamp = EVENT_TIMESTAMP.plusNanos(1), + ), + ) + electionVoter.process( + ElectionConsensusReached( + account = account, + transaction = transaction, + votes = emptySet(), + timestamp = EVENT_TIMESTAMP.plusNanos(2), + ), + ) + electionVoter.process(accountUpdated) + } + + @TearDown(Level.Trial) + fun tearDownTrial() { + electionVoter.close() + } +} + +private class BenchmarkSigner( + override val algorithm: AttoAlgorithm, + override val publicKey: AttoPublicKey, +) : AttoSigner { + override val address: AttoAddress = AttoAddress(algorithm, publicKey) + + private val signature = AttoSignature(ByteArray(64) { 1 }) + + override suspend fun sign(hash: AttoHash): AttoSignature { + delay(100.milliseconds) + return signature + } +} + +private class NoOpSpringPublisher : ApplicationEventPublisher { + override fun publishEvent(event: ApplicationEvent) { + // no-op for benchmarking + } + + override fun publishEvent(event: Any) { + // no-op for benchmarking + } +} + +private class NoOpAccountRepository : AccountRepository { + override fun saveAll(entities: List): Flow = emptyFlow() + + override suspend fun findById(id: AttoPublicKey): Account? = null + + override fun findAllById(ids: Iterable): Flow = emptyFlow() + + override suspend fun deleteAll() {} +} + +private fun createAccount(node: AttoNode): Account = + Account( + publicKey = node.publicKey, + network = node.network, + version = 0U.toAttoVersion(), + algorithm = node.algorithm, + height = 0, + balance = AttoAmount.MIN, + lastTransactionTimestamp = Instant.EPOCH, + lastTransactionHash = AttoHash(ByteArray(32)), + representativeAlgorithm = node.algorithm, + representativePublicKey = node.publicKey, + ) + +private fun createTransaction( + node: AttoNode, + height: UInt, +): Transaction { + val seed = height.toInt() + val block = + AttoReceiveBlock( + version = 0U.toAttoVersion(), + network = node.network, + algorithm = node.algorithm, + publicKey = node.publicKey, + height = height.toAttoHeight(), + balance = AttoAmount.MAX, + timestamp = AttoInstant.now(), + previous = AttoHash(ByteArray(32) { index -> (seed + index).toByte() }), + sendHashAlgorithm = node.algorithm, + sendHash = AttoHash(ByteArray(32) { index -> (seed * 17 + index).toByte() }), + ) + + return Transaction( + block = block, + signature = AttoSignature(ByteArray(64) { index -> (seed + index).toByte() }), + work = AttoWork(ByteArray(8) { index -> (seed * 3 + index).toByte() }), + ) +} + +private fun createVoteWeighter(node: AttoNode): VoteWeighter { + val properties = + VoteWeightProperties().apply { + minimalConfirmationWeight = "1" + confirmationThreshold = 1 + minimalRebroadcastWeight = "1" + samplePeriodInDays = 1 + } + val voteWeighter = + VoteWeighter( + thisNode = node, + properties = properties, + weightService = WeightService(interfaceProxy()), + voteRepository = interfaceProxy(), + genesisTransaction = createTransaction(node, 1U), + ) + + val weightMapField = VoteWeighter::class.java.getDeclaredField("weightMap") + weightMapField.isAccessible = true + @Suppress("UNCHECKED_CAST") + val weightMap = weightMapField.get(voteWeighter) as ConcurrentHashMap + weightMap[node.publicKey] = AttoAmount.MAX + + return voteWeighter +} + +private inline fun interfaceProxy(): T = + Proxy + .newProxyInstance( + T::class.java.classLoader, + arrayOf(T::class.java), + ) { proxy, method, args -> + when (method.name) { + "toString" -> "${T::class.java.simpleName}Proxy" + "hashCode" -> System.identityHashCode(proxy) + "equals" -> proxy === args?.singleOrNull() + else -> defaultValue(method.returnType) + } + } as T + +private fun defaultValue(returnType: Class<*>): Any? = + when (returnType) { + java.lang.Boolean.TYPE -> false + java.lang.Byte.TYPE -> 0.toByte() + java.lang.Short.TYPE -> 0.toShort() + java.lang.Integer.TYPE -> 0 + java.lang.Long.TYPE -> 0L + java.lang.Float.TYPE -> 0f + java.lang.Double.TYPE -> 0.0 + java.lang.Character.TYPE -> 0.toChar() + else -> null + } From da2e98200d8412e235100c28ac745347133e905d Mon Sep 17 00:00:00 2001 From: rotilho Date: Thu, 12 Mar 2026 16:02:16 +0100 Subject: [PATCH 3/7] Allow `ElectionVoter` to cast concurrent votes --- .../node/election/ElectionVoterBenchmark.kt | 20 +- .../cash/atto/node/election/ElectionVoter.kt | 291 +++++++++--------- .../atto/node/election/ElectionVoterTest.kt | 23 +- 3 files changed, 174 insertions(+), 160 deletions(-) diff --git a/src/benchmark/kotlin/cash/atto/node/election/ElectionVoterBenchmark.kt b/src/benchmark/kotlin/cash/atto/node/election/ElectionVoterBenchmark.kt index ab0b37fb..65b4d7c1 100644 --- a/src/benchmark/kotlin/cash/atto/node/election/ElectionVoterBenchmark.kt +++ b/src/benchmark/kotlin/cash/atto/node/election/ElectionVoterBenchmark.kt @@ -93,18 +93,10 @@ class ElectionVoterFlowBenchmark { } @Benchmark - @Threads(10) + @Threads(100) fun processElectionFlow() = runBlocking { val transaction = createTransaction(node, nextHeight.getAndIncrement().toUInt()) - val accountUpdated = - AccountUpdated( - source = TransactionSource.ELECTION, - previousAccount = account, - updatedAccount = account, - transaction = transaction, - timestamp = EVENT_TIMESTAMP, - ) electionVoter.process(ElectionStarted(account, transaction, EVENT_TIMESTAMP)) electionVoter.process( @@ -122,7 +114,15 @@ class ElectionVoterFlowBenchmark { timestamp = EVENT_TIMESTAMP.plusNanos(2), ), ) - electionVoter.process(accountUpdated) + electionVoter.process( + AccountUpdated( + source = TransactionSource.ELECTION, + previousAccount = account, + updatedAccount = account, + transaction = transaction, + timestamp = EVENT_TIMESTAMP, + ) + ) } @TearDown(Level.Trial) diff --git a/src/main/kotlin/cash/atto/node/election/ElectionVoter.kt b/src/main/kotlin/cash/atto/node/election/ElectionVoter.kt index 94d08a85..e02de58f 100644 --- a/src/main/kotlin/cash/atto/node/election/ElectionVoter.kt +++ b/src/main/kotlin/cash/atto/node/election/ElectionVoter.kt @@ -1,7 +1,6 @@ package cash.atto.node.election import cash.atto.commons.AttoAmount -import cash.atto.commons.AttoHash import cash.atto.commons.AttoSignedVote import cash.atto.commons.AttoSigner import cash.atto.commons.AttoUnit @@ -41,7 +40,10 @@ import org.springframework.context.event.EventListener import org.springframework.stereotype.Service import java.math.BigDecimal import java.time.Instant +import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.Executors +import kotlin.concurrent.atomics.AtomicBoolean +import kotlin.concurrent.atomics.ExperimentalAtomicApi import kotlin.time.toKotlinDuration @Service @@ -57,20 +59,14 @@ class ElectionVoter( companion object { val MIN_WEIGHT = AttoAmount.from(AttoUnit.ATTO, BigDecimal.valueOf(1_000_000).toString()) // 1M - val finalVoteTimestamp = AttoVote.finalTimestamp.toJavaInstant() } private val scope = CoroutineScope(Executors.newVirtualThreadPerTaskExecutor().asCoroutineDispatcher() + SupervisorJob()) - private val mutex = Mutex() - - private val consensusMap = HashMap() - - private val pending = HashMap() + private val consensusMap = ConcurrentHashMap() override fun clear() { consensusMap.clear() - pending.clear() } @PreDestroy @@ -80,81 +76,40 @@ class ElectionVoter( scope.cancel() } - private suspend fun consensed( - transaction: Transaction, - consensusTimestamp: Instant, - ) { + private fun consensusFor(transaction: Transaction): Consensus { val publicKeyHeight = transaction.toPublicKeyHeight() - - val oldConsensus = consensusMap[publicKeyHeight] - val newConsensus = Consensus(transaction, consensusTimestamp) - - if (oldConsensus != null && oldConsensus.consensusTimestamp > newConsensus.consensusTimestamp) { - return - } - - consensusMap[publicKeyHeight] = newConsensus - - if (oldConsensus != null && oldConsensus.transaction == newConsensus.transaction) { - return - } - - logger.trace { "Consensus changed from ${oldConsensus?.hash} to ${transaction.hash}" } - - if (oldConsensus == null) { - vote(transaction, Instant.now()) - } else { - voteAsynchronously(transaction, consensusTimestamp) - } + return consensusMap.computeIfAbsent(publicKeyHeight) { Consensus(transaction) } } @EventListener suspend fun process(event: ElectionStarted) { - val publicKeyHeight = event.transaction.toPublicKeyHeight() - - mutex.withLock { - if (consensusMap[publicKeyHeight] != null) { - return - } - - consensed(event.transaction, event.timestamp) - } - } - - private suspend fun consensusReached(transaction: Transaction) { - remove(transaction) + val consensus = consensusFor(event.transaction) + consensus.start(event.timestamp) } @EventListener suspend fun process(event: ElectionConsensusChanged) { - val transaction = event.transaction - - mutex.withLock { - consensed(transaction, event.timestamp) - } + val consensus = consensusFor(event.transaction) + consensus.update(event.transaction, event.timestamp) } @EventListener suspend fun process(event: ElectionConsensusReached) { - mutex.withLock { - consensusReached(event.transaction) - } + val consensus = consensusFor(event.transaction) + consensus.update(event.transaction, event.timestamp) } @EventListener suspend fun process(event: ElectionExpiring) { - val publicKeyHeight = event.transaction.toPublicKeyHeight() - mutex.withLock { - val consensus = consensusMap[publicKeyHeight] ?: return - vote(consensus.transaction, Instant.now()) - } + val consensus = consensusFor(event.transaction) + consensus.reaffirm() } @EventListener - suspend fun process(event: ElectionExpired) = - mutex.withLock { - remove(event.transaction) - } + suspend fun process(event: ElectionExpired) { + val consensus = consensusFor(event.transaction) + consensus.remove() + } @EventListener suspend fun process(event: AccountUpdated) { @@ -162,9 +117,9 @@ class ElectionVoter( return } - mutex.withLock { - vote(event.transaction, finalVoteTimestamp) - } + val consensus = consensusFor(event.transaction) + consensus.finalVote(event.transaction) + consensus.remove() } @EventListener @@ -178,92 +133,148 @@ class ElectionVoter( return } - mutex.withLock { - vote(event.transaction, finalVoteTimestamp) - } + val consensus = consensusFor(event.transaction) + consensus.finalVote(event.transaction) } - /** - * During the election phase, multiple transactions at the same height can trigger a flurry of votes, - * making the provisional consensus bounce back and forth. - * - * Pausing for 5 seconds lets the network accumulate all votes first, so we cast a new vote - * against a more stable consensus instead of chasing transient shifts. - */ - private suspend fun voteAsynchronously( - transaction: Transaction, - timestamp: Instant, + private fun canVote(weight: AttoAmount): Boolean = thisNode.isVoter() && weight >= MIN_WEIGHT + + + @OptIn(ExperimentalAtomicApi::class) + private inner class Consensus( + private var transaction: Transaction, ) { - val publicKeyHeight = transaction.toPublicKeyHeight() - pending.remove(publicKeyHeight)?.cancel() - val job = - scope.launch { - delay(Election.ELECTION_STABILITY_MINIMAL_TIME.toKotlinDuration()) - mutex.withLock { - vote(transaction, timestamp) - pending.remove(publicKeyHeight) - } + private val mutex = Mutex() + private val started = AtomicBoolean(false) + private val publicKeyHeight = transaction.toPublicKeyHeight() + private var consensusTimestamp = transaction.block.timestamp.toJavaInstant() + private var pendingJob: Job? = null + + suspend fun start( + timestamp: Instant, + ) = mutex.withLock { + if (!started.compareAndSet(expectedValue = false, newValue = true)) { + return@withLock } - pending[publicKeyHeight] = job - } + applyConsensus(transaction, timestamp, forceVote = true) + } - private suspend fun vote( - transaction: Transaction, - timestamp: Instant, - ) { - val weight = voteWeighter.get() - if (!canVote(weight)) { - logger.trace { "This node can't vote yet" } - return + private fun checkStarted() { + require(started.load()) { "Consensus must be started before calling this method" } } - val attoVote = - AttoVote( - version = 0U.toAttoVersion(), - algorithm = thisNode.algorithm, - publicKey = thisNode.publicKey, - blockAlgorithm = transaction.algorithm, - blockHash = transaction.hash, - timestamp = timestamp.toAtto(), - ) - val attoSignedVote = - AttoSignedVote( - vote = attoVote, - signature = signer.sign(attoVote), - ) - - val votePush = - AttoVotePush( - vote = attoSignedVote, - ) - - val strategy = - if (attoVote.isFinal()) { - BroadcastStrategy.EVERYONE - } else { - BroadcastStrategy.VOTERS + suspend fun update( + transaction: Transaction, + timestamp: Instant, + ) = mutex.withLock { + checkStarted() + applyConsensus(transaction, timestamp) + } + + suspend fun reaffirm() = mutex.withLock { + checkStarted() + publishVote(transaction, Instant.now()) + } + + + suspend fun finalVote(transaction: Transaction) = mutex.withLock { + checkStarted() + publishVote(transaction, AttoVote.finalTimestamp.toJavaInstant()) + } + + private suspend fun publishVote( + transaction: Transaction, + timestamp: Instant, + ) { + val weight = voteWeighter.get() + if (!canVote(weight)) { + logger.trace { "This node can't vote yet" } + return } - logger.debug { "Sending to $strategy $votePush" } + val attoVote = + AttoVote( + version = 0U.toAttoVersion(), + algorithm = thisNode.algorithm, + publicKey = thisNode.publicKey, + blockAlgorithm = transaction.algorithm, + blockHash = transaction.hash, + timestamp = timestamp.toAtto(), + ) + val attoSignedVote = + AttoSignedVote( + vote = attoVote, + signature = signer.sign(attoVote), + ) + + val votePush = + AttoVotePush( + vote = attoSignedVote, + ) + + val strategy = + if (attoVote.isFinal()) { + BroadcastStrategy.EVERYONE + } else { + BroadcastStrategy.VOTERS + } - messagePublisher.publish(BroadcastNetworkMessage(strategy, emptySet(), votePush)) - eventPublisher.publish(VoteValidated(transaction, Vote.from(weight, attoSignedVote))) - } + logger.debug { "Sending to $strategy $votePush" } - private fun canVote(weight: AttoAmount): Boolean = thisNode.isVoter() && weight >= MIN_WEIGHT + messagePublisher.publish(BroadcastNetworkMessage(strategy, emptySet(), votePush)) + eventPublisher.publish(VoteValidated(transaction, Vote.from(weight, attoSignedVote))) + } - private suspend fun remove(transaction: Transaction) { - val publicKeyHeight = transaction.toPublicKeyHeight() - consensusMap.remove(publicKeyHeight) - pending.remove(publicKeyHeight)?.cancel() - logger.trace { "Removed ${transaction.hash} from the voter queue" } - } - private data class Consensus( - val transaction: Transaction, - val consensusTimestamp: Instant, - ) { - val hash: AttoHash - get() = transaction.hash + private suspend fun applyConsensus( + transaction: Transaction, + timestamp: Instant, + forceVote: Boolean = false + ) { + val oldTransaction = this.transaction + val oldTimestamp = this.consensusTimestamp + + if (oldTimestamp >= timestamp) { + return + } + + this.transaction = transaction + this.consensusTimestamp = timestamp + + if (!forceVote && oldTransaction == transaction) { + return + } + + logger.trace { "Consensus changed from ${oldTransaction.hash} to ${transaction.hash}" } + + if (oldTransaction == transaction) { + publishVote(transaction, timestamp) + } else { + scheduleDelayedVote(transaction, timestamp) + } + } + + private fun scheduleDelayedVote( + transaction: Transaction, + timestamp: Instant, + ) { + pendingJob?.cancel() + val job = + scope.launch { + delay(Election.ELECTION_STABILITY_MINIMAL_TIME.toKotlinDuration()) + mutex.withLock { + publishVote(transaction, timestamp) + pendingJob = null + } + } + pendingJob = job + } + + + suspend fun remove() = mutex.withLock { + pendingJob?.cancel() + consensusMap.remove(publicKeyHeight) + logger.trace { "Removed ${transaction.hash} from the voter queue" } + } } } diff --git a/src/test/kotlin/cash/atto/node/election/ElectionVoterTest.kt b/src/test/kotlin/cash/atto/node/election/ElectionVoterTest.kt index a3f6edae..474429cb 100644 --- a/src/test/kotlin/cash/atto/node/election/ElectionVoterTest.kt +++ b/src/test/kotlin/cash/atto/node/election/ElectionVoterTest.kt @@ -219,17 +219,18 @@ class ElectionVoterTest { } @Test - fun `should NOT send vote when consensus reached`() { + fun `should NOT send additional vote when consensus reached`() { // given val transaction = Transaction.sample() // when runBlocking { + electionVoter.process(ElectionStarted(account, transaction)) electionVoter.process(ElectionConsensusReached(account, transaction, emptySet())) } - // then - verify(exactly = 0) { + // then — only the initial vote from ElectionStarted, no extra vote from ConsensusReached + verify(exactly = 1) { messagePublisher.publish( match { message -> message as BroadcastNetworkMessage @@ -237,7 +238,7 @@ class ElectionVoterTest { }, ) } - verify(exactly = 0) { + verify(exactly = 1) { eventPublisher.publish( match { event -> event as VoteValidated @@ -287,6 +288,7 @@ class ElectionVoterTest { // when runBlocking { + electionVoter.process(ElectionStarted(account, transaction)) electionVoter.process(accountUpdated) } @@ -319,23 +321,24 @@ class ElectionVoterTest { // when runBlocking { + electionVoter.process(ElectionStarted(account, transaction)) electionVoter.process(TransactionRejected(TransactionRejectionReason.OLD_TRANSACTION, "Test", account, transaction)) } - // then + // then — initial vote (VOTERS) + final vote (EVERYONE) verify(exactly = 1) { messagePublisher.publish( match { message -> message as BroadcastNetworkMessage - message.strategy == BroadcastStrategy.EVERYONE + message.strategy == BroadcastStrategy.VOTERS }, ) } verify(exactly = 1) { - eventPublisher.publish( - match { event -> - event as VoteValidated - event.transaction == transaction + messagePublisher.publish( + match { message -> + message as BroadcastNetworkMessage + message.strategy == BroadcastStrategy.EVERYONE }, ) } From d5af8e59f7f3b4b7b9dd4598940fdd8d39ded3c5 Mon Sep 17 00:00:00 2001 From: rotilho Date: Thu, 12 Mar 2026 22:10:20 +0100 Subject: [PATCH 4/7] Cancel old vote job --- .../cash/atto/node/election/Election.kt | 2 +- .../cash/atto/node/election/ElectionVoter.kt | 140 ++++++++---------- 2 files changed, 65 insertions(+), 77 deletions(-) diff --git a/src/main/kotlin/cash/atto/node/election/Election.kt b/src/main/kotlin/cash/atto/node/election/Election.kt index 509c04ee..87dcee2f 100644 --- a/src/main/kotlin/cash/atto/node/election/Election.kt +++ b/src/main/kotlin/cash/atto/node/election/Election.kt @@ -34,7 +34,7 @@ class Election( private val logger = KotlinLogging.logger {} companion object { - val ELECTION_STABILITY_MINIMAL_TIME = Duration.ofSeconds(5) + val ELECTION_STABILITY_MINIMAL_TIME: Duration = Duration.ofSeconds(5) } private val mutex = Mutex() diff --git a/src/main/kotlin/cash/atto/node/election/ElectionVoter.kt b/src/main/kotlin/cash/atto/node/election/ElectionVoter.kt index e02de58f..6193d736 100644 --- a/src/main/kotlin/cash/atto/node/election/ElectionVoter.kt +++ b/src/main/kotlin/cash/atto/node/election/ElectionVoter.kt @@ -76,39 +76,40 @@ class ElectionVoter( scope.cancel() } - private fun consensusFor(transaction: Transaction): Consensus { - val publicKeyHeight = transaction.toPublicKeyHeight() - return consensusMap.computeIfAbsent(publicKeyHeight) { Consensus(transaction) } + private fun consensusFor(transaction: Transaction): Consensus? { + return consensusMap[transaction.toPublicKeyHeight()] } @EventListener suspend fun process(event: ElectionStarted) { - val consensus = consensusFor(event.transaction) + val transaction = event.transaction + val publicKeyHeight = transaction.toPublicKeyHeight() + val consensus = consensusMap.computeIfAbsent(publicKeyHeight) { Consensus(transaction) } consensus.start(event.timestamp) } @EventListener suspend fun process(event: ElectionConsensusChanged) { - val consensus = consensusFor(event.transaction) + val consensus = consensusFor(event.transaction) ?: return consensus.update(event.transaction, event.timestamp) } @EventListener suspend fun process(event: ElectionConsensusReached) { - val consensus = consensusFor(event.transaction) + val consensus = consensusFor(event.transaction) ?: return consensus.update(event.transaction, event.timestamp) } @EventListener suspend fun process(event: ElectionExpiring) { - val consensus = consensusFor(event.transaction) + val consensus = consensusFor(event.transaction) ?: return consensus.reaffirm() } @EventListener suspend fun process(event: ElectionExpired) { - val consensus = consensusFor(event.transaction) - consensus.remove() + val consensus = consensusFor(event.transaction) ?: return + consensus.expire() } @EventListener @@ -117,9 +118,8 @@ class ElectionVoter( return } - val consensus = consensusFor(event.transaction) + val consensus = consensusFor(event.transaction) ?: return consensus.finalVote(event.transaction) - consensus.remove() } @EventListener @@ -133,7 +133,7 @@ class ElectionVoter( return } - val consensus = consensusFor(event.transaction) + val consensus = Consensus(event.transaction) consensus.finalVote(event.transaction) } @@ -148,7 +148,7 @@ class ElectionVoter( private val started = AtomicBoolean(false) private val publicKeyHeight = transaction.toPublicKeyHeight() private var consensusTimestamp = transaction.block.timestamp.toJavaInstant() - private var pendingJob: Job? = null + private var job: Job? = null suspend fun start( timestamp: Instant, @@ -159,32 +159,38 @@ class ElectionVoter( applyConsensus(transaction, timestamp, forceVote = true) } - private fun checkStarted() { - require(started.load()) { "Consensus must be started before calling this method" } + private fun remove() { + job?.cancel() + if (consensusMap.remove(publicKeyHeight) != null) { + logger.trace { "Removed ${transaction.hash} from the voter" } + } } suspend fun update( transaction: Transaction, timestamp: Instant, ) = mutex.withLock { - checkStarted() applyConsensus(transaction, timestamp) } suspend fun reaffirm() = mutex.withLock { - checkStarted() - publishVote(transaction, Instant.now()) + publishVote(transaction, Instant.now(), consensusChanged = false) } suspend fun finalVote(transaction: Transaction) = mutex.withLock { - checkStarted() - publishVote(transaction, AttoVote.finalTimestamp.toJavaInstant()) + publishVote(transaction, AttoVote.finalTimestamp.toJavaInstant(), consensusChanged = false) + remove() + } + + suspend fun expire() = mutex.withLock { + remove() } private suspend fun publishVote( transaction: Transaction, timestamp: Instant, + consensusChanged: Boolean = false, ) { val weight = voteWeighter.get() if (!canVote(weight)) { @@ -192,37 +198,46 @@ class ElectionVoter( return } - val attoVote = - AttoVote( - version = 0U.toAttoVersion(), - algorithm = thisNode.algorithm, - publicKey = thisNode.publicKey, - blockAlgorithm = transaction.algorithm, - blockHash = transaction.hash, - timestamp = timestamp.toAtto(), - ) - val attoSignedVote = - AttoSignedVote( - vote = attoVote, - signature = signer.sign(attoVote), - ) - - val votePush = - AttoVotePush( - vote = attoSignedVote, - ) - - val strategy = - if (attoVote.isFinal()) { - BroadcastStrategy.EVERYONE - } else { - BroadcastStrategy.VOTERS + job?.cancel() + val newJob = scope.launch { + if (consensusChanged) { + delay(Election.ELECTION_STABILITY_MINIMAL_TIME.toKotlinDuration()) } - logger.debug { "Sending to $strategy $votePush" } + val attoVote = + AttoVote( + version = 0U.toAttoVersion(), + algorithm = thisNode.algorithm, + publicKey = thisNode.publicKey, + blockAlgorithm = transaction.algorithm, + blockHash = transaction.hash, + timestamp = timestamp.toAtto(), + ) + val attoSignedVote = + AttoSignedVote( + vote = attoVote, + signature = signer.sign(attoVote), + ) + + val votePush = + AttoVotePush( + vote = attoSignedVote, + ) + + val strategy = + if (attoVote.isFinal()) { + BroadcastStrategy.EVERYONE + } else { + BroadcastStrategy.VOTERS + } + + logger.debug { "Sending to $strategy $votePush" } - messagePublisher.publish(BroadcastNetworkMessage(strategy, emptySet(), votePush)) - eventPublisher.publish(VoteValidated(transaction, Vote.from(weight, attoSignedVote))) + messagePublisher.publish(BroadcastNetworkMessage(strategy, emptySet(), votePush)) + eventPublisher.publish(VoteValidated(transaction, Vote.from(weight, attoSignedVote))) + } + job = newJob + newJob.join() } @@ -247,34 +262,7 @@ class ElectionVoter( logger.trace { "Consensus changed from ${oldTransaction.hash} to ${transaction.hash}" } - if (oldTransaction == transaction) { - publishVote(transaction, timestamp) - } else { - scheduleDelayedVote(transaction, timestamp) - } - } - - private fun scheduleDelayedVote( - transaction: Transaction, - timestamp: Instant, - ) { - pendingJob?.cancel() - val job = - scope.launch { - delay(Election.ELECTION_STABILITY_MINIMAL_TIME.toKotlinDuration()) - mutex.withLock { - publishVote(transaction, timestamp) - pendingJob = null - } - } - pendingJob = job - } - - - suspend fun remove() = mutex.withLock { - pendingJob?.cancel() - consensusMap.remove(publicKeyHeight) - logger.trace { "Removed ${transaction.hash} from the voter queue" } + publishVote(transaction, timestamp, consensusChanged = oldTransaction != transaction) } } } From f058377f7981f994881803c1e02fbacee275446d Mon Sep 17 00:00:00 2001 From: rotilho Date: Thu, 12 Mar 2026 22:18:46 +0100 Subject: [PATCH 5/7] Clean up and formatting --- build.gradle.kts | 3 - .../node/election/ElectionVoterBenchmark.kt | 4 +- .../cash/atto/node/election/ElectionVoter.kt | 116 +++++++++--------- 3 files changed, 59 insertions(+), 64 deletions(-) diff --git a/build.gradle.kts b/build.gradle.kts index f268fb43..d5bf378d 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -1,5 +1,3 @@ -import kotlinx.benchmark.gradle.JvmBenchmarkTarget - plugins { val kotlinVersion = "2.3.0" @@ -137,7 +135,6 @@ dependencies { testImplementation("org.testcontainers:mysql") testImplementation("org.testcontainers:r2dbc") add("benchmarkImplementation", "org.jetbrains.kotlinx:kotlinx-benchmark-runtime:0.4.14") - } tasks.withType { diff --git a/src/benchmark/kotlin/cash/atto/node/election/ElectionVoterBenchmark.kt b/src/benchmark/kotlin/cash/atto/node/election/ElectionVoterBenchmark.kt index 65b4d7c1..35238d2e 100644 --- a/src/benchmark/kotlin/cash/atto/node/election/ElectionVoterBenchmark.kt +++ b/src/benchmark/kotlin/cash/atto/node/election/ElectionVoterBenchmark.kt @@ -48,7 +48,7 @@ import java.util.concurrent.atomic.AtomicInteger import kotlin.time.Duration.Companion.milliseconds @State(Scope.Benchmark) -class ElectionVoterFlowBenchmark { +class ElectionVoterBenchmark { private companion object { val EVENT_TIMESTAMP: Instant = Instant.EPOCH } @@ -121,7 +121,7 @@ class ElectionVoterFlowBenchmark { updatedAccount = account, transaction = transaction, timestamp = EVENT_TIMESTAMP, - ) + ), ) } diff --git a/src/main/kotlin/cash/atto/node/election/ElectionVoter.kt b/src/main/kotlin/cash/atto/node/election/ElectionVoter.kt index 6193d736..1b4dfee0 100644 --- a/src/main/kotlin/cash/atto/node/election/ElectionVoter.kt +++ b/src/main/kotlin/cash/atto/node/election/ElectionVoter.kt @@ -76,9 +76,7 @@ class ElectionVoter( scope.cancel() } - private fun consensusFor(transaction: Transaction): Consensus? { - return consensusMap[transaction.toPublicKeyHeight()] - } + private fun consensusFor(transaction: Transaction): Consensus? = consensusMap[transaction.toPublicKeyHeight()] @EventListener suspend fun process(event: ElectionStarted) { @@ -139,7 +137,6 @@ class ElectionVoter( private fun canVote(weight: AttoAmount): Boolean = thisNode.isVoter() && weight >= MIN_WEIGHT - @OptIn(ExperimentalAtomicApi::class) private inner class Consensus( private var transaction: Transaction, @@ -150,14 +147,13 @@ class ElectionVoter( private var consensusTimestamp = transaction.block.timestamp.toJavaInstant() private var job: Job? = null - suspend fun start( - timestamp: Instant, - ) = mutex.withLock { - if (!started.compareAndSet(expectedValue = false, newValue = true)) { - return@withLock + suspend fun start(timestamp: Instant) = + mutex.withLock { + if (!started.compareAndSet(expectedValue = false, newValue = true)) { + return@withLock + } + applyConsensus(transaction, timestamp, forceVote = true) } - applyConsensus(transaction, timestamp, forceVote = true) - } private fun remove() { job?.cancel() @@ -173,19 +169,21 @@ class ElectionVoter( applyConsensus(transaction, timestamp) } - suspend fun reaffirm() = mutex.withLock { - publishVote(transaction, Instant.now(), consensusChanged = false) - } - + suspend fun reaffirm() = + mutex.withLock { + publishVote(transaction, Instant.now(), consensusChanged = false) + } - suspend fun finalVote(transaction: Transaction) = mutex.withLock { - publishVote(transaction, AttoVote.finalTimestamp.toJavaInstant(), consensusChanged = false) - remove() - } + suspend fun finalVote(transaction: Transaction) = + mutex.withLock { + publishVote(transaction, AttoVote.finalTimestamp.toJavaInstant(), consensusChanged = false) + remove() + } - suspend fun expire() = mutex.withLock { - remove() - } + suspend fun expire() = + mutex.withLock { + remove() + } private suspend fun publishVote( transaction: Transaction, @@ -199,52 +197,52 @@ class ElectionVoter( } job?.cancel() - val newJob = scope.launch { - if (consensusChanged) { - delay(Election.ELECTION_STABILITY_MINIMAL_TIME.toKotlinDuration()) - } - - val attoVote = - AttoVote( - version = 0U.toAttoVersion(), - algorithm = thisNode.algorithm, - publicKey = thisNode.publicKey, - blockAlgorithm = transaction.algorithm, - blockHash = transaction.hash, - timestamp = timestamp.toAtto(), - ) - val attoSignedVote = - AttoSignedVote( - vote = attoVote, - signature = signer.sign(attoVote), - ) - - val votePush = - AttoVotePush( - vote = attoSignedVote, - ) - - val strategy = - if (attoVote.isFinal()) { - BroadcastStrategy.EVERYONE - } else { - BroadcastStrategy.VOTERS + val newJob = + scope.launch { + if (consensusChanged) { + delay(Election.ELECTION_STABILITY_MINIMAL_TIME.toKotlinDuration()) } - logger.debug { "Sending to $strategy $votePush" } - - messagePublisher.publish(BroadcastNetworkMessage(strategy, emptySet(), votePush)) - eventPublisher.publish(VoteValidated(transaction, Vote.from(weight, attoSignedVote))) - } + val attoVote = + AttoVote( + version = 0U.toAttoVersion(), + algorithm = thisNode.algorithm, + publicKey = thisNode.publicKey, + blockAlgorithm = transaction.algorithm, + blockHash = transaction.hash, + timestamp = timestamp.toAtto(), + ) + val attoSignedVote = + AttoSignedVote( + vote = attoVote, + signature = signer.sign(attoVote), + ) + + val votePush = + AttoVotePush( + vote = attoSignedVote, + ) + + val strategy = + if (attoVote.isFinal()) { + BroadcastStrategy.EVERYONE + } else { + BroadcastStrategy.VOTERS + } + + logger.debug { "Sending to $strategy $votePush" } + + messagePublisher.publish(BroadcastNetworkMessage(strategy, emptySet(), votePush)) + eventPublisher.publish(VoteValidated(transaction, Vote.from(weight, attoSignedVote))) + } job = newJob newJob.join() } - private suspend fun applyConsensus( transaction: Transaction, timestamp: Instant, - forceVote: Boolean = false + forceVote: Boolean = false, ) { val oldTransaction = this.transaction val oldTimestamp = this.consensusTimestamp From dd080102352159df757eab056c731c484c1e58ff Mon Sep 17 00:00:00 2001 From: rotilho Date: Thu, 12 Mar 2026 22:23:01 +0100 Subject: [PATCH 6/7] Spread `ElectionVoter` delays to reduce simultaneous voting --- .../kotlin/cash/atto/node/election/ElectionVoter.kt | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/src/main/kotlin/cash/atto/node/election/ElectionVoter.kt b/src/main/kotlin/cash/atto/node/election/ElectionVoter.kt index 1b4dfee0..3157daa4 100644 --- a/src/main/kotlin/cash/atto/node/election/ElectionVoter.kt +++ b/src/main/kotlin/cash/atto/node/election/ElectionVoter.kt @@ -44,7 +44,7 @@ import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.Executors import kotlin.concurrent.atomics.AtomicBoolean import kotlin.concurrent.atomics.ExperimentalAtomicApi -import kotlin.time.toKotlinDuration +import kotlin.random.Random @Service class ElectionVoter( @@ -200,7 +200,15 @@ class ElectionVoter( val newJob = scope.launch { if (consensusChanged) { - delay(Election.ELECTION_STABILITY_MINIMAL_TIME.toKotlinDuration()) + val baseDelay = Election.ELECTION_STABILITY_MINIMAL_TIME.toMillis() + /* + * Extra delay spreads votes across a 2s window so that not all nodes + * cast their votes at the exact same instant, reducing the chance of + * a race condition where simultaneous votes could cause more + * consensus flips. + */ + val extraDelay = Random.nextLong(0, 2001) + delay(baseDelay + extraDelay) } val attoVote = From d4e7547161203e6325f6b464c61419174626c8ef Mon Sep 17 00:00:00 2001 From: rotilho Date: Thu, 12 Mar 2026 22:48:16 +0100 Subject: [PATCH 7/7] Trig ElectionConsensusChanged just when consensus changes --- .../cash/atto/node/election/Election.kt | 6 +- .../cash/atto/node/election/ElectionTest.kt | 346 ++++++++++++++++++ 2 files changed, 351 insertions(+), 1 deletion(-) create mode 100644 src/test/kotlin/cash/atto/node/election/ElectionTest.kt diff --git a/src/main/kotlin/cash/atto/node/election/Election.kt b/src/main/kotlin/cash/atto/node/election/Election.kt index 87dcee2f..fd408cf9 100644 --- a/src/main/kotlin/cash/atto/node/election/Election.kt +++ b/src/main/kotlin/cash/atto/node/election/Election.kt @@ -99,6 +99,7 @@ class Election( return@withLock } + val previousProvisionalTransactionElection = publicKeyHeightElection.getProvisionalLeader() if (!publicKeyHeightElection.add(vote)) { logger.trace { "Vote is old and it won't be considered in the election $publicKeyHeight $vote" } return@withLock @@ -126,7 +127,10 @@ class Election( "Transaction ${provisionalTransactionElection.transaction.hash} is the current provisional leader but " + "totalWeight(${provisionalTransactionElection.totalWeight}) < minimalConfirmationWeight($minimalConfirmationWeight)." } - eventPublisher.publish(ElectionConsensusChanged(account, provisionalTransactionElection.transaction)) + + if (provisionalTransactionElection.transaction != previousProvisionalTransactionElection.transaction) { + eventPublisher.publish(ElectionConsensusChanged(account, provisionalTransactionElection.transaction)) + } } @Scheduled(fixedRate = 1, timeUnit = TimeUnit.MINUTES) diff --git a/src/test/kotlin/cash/atto/node/election/ElectionTest.kt b/src/test/kotlin/cash/atto/node/election/ElectionTest.kt new file mode 100644 index 00000000..d0011c9b --- /dev/null +++ b/src/test/kotlin/cash/atto/node/election/ElectionTest.kt @@ -0,0 +1,346 @@ +package cash.atto.node.election + +import cash.atto.commons.AttoAlgorithm +import cash.atto.commons.AttoAmount +import cash.atto.commons.AttoBlock +import cash.atto.commons.AttoHash +import cash.atto.commons.AttoInstant +import cash.atto.commons.AttoNetwork +import cash.atto.commons.AttoPublicKey +import cash.atto.commons.AttoReceiveBlock +import cash.atto.commons.AttoSignature +import cash.atto.commons.AttoWork +import cash.atto.commons.toAttoHeight +import cash.atto.commons.toAttoVersion +import cash.atto.node.EventPublisher +import cash.atto.node.account.Account +import cash.atto.node.account.AccountUpdated +import cash.atto.node.transaction.Transaction +import cash.atto.node.transaction.TransactionSource +import cash.atto.node.transaction.TransactionValidated +import cash.atto.node.vote.Vote +import cash.atto.node.vote.VoteValidated +import cash.atto.node.vote.weight.VoteWeighter +import io.mockk.every +import io.mockk.impl.annotations.InjectMockKs +import io.mockk.impl.annotations.MockK +import io.mockk.impl.annotations.RelaxedMockK +import io.mockk.junit5.MockKExtension +import io.mockk.verify +import kotlinx.coroutines.runBlocking +import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api.BeforeEach +import org.junit.jupiter.api.Test +import org.junit.jupiter.api.extension.ExtendWith +import java.time.Instant +import kotlin.random.Random + +@ExtendWith(MockKExtension::class) +class ElectionTest { + @MockK + lateinit var properties: ElectionProperties + + @RelaxedMockK + lateinit var voteWeighter: VoteWeighter + + @RelaxedMockK + lateinit var eventPublisher: EventPublisher + + @RelaxedMockK + lateinit var account: Account + + @InjectMockKs + lateinit var election: Election + + private val minimalWeight = AttoAmount(1000UL) + + @BeforeEach + fun beforeEach() { + every { voteWeighter.getMinimalConfirmationWeight() } returns minimalWeight + every { properties.expiringAfterTimeInSeconds } returns 300L + every { properties.expiredAfterTimeInSeconds } returns 600L + election.clear() + } + + @Test + fun `should start election when TransactionValidated event is received`() { + // given + val transaction = Transaction.sample() + val event = TransactionValidated(account, transaction) + + // when + runBlocking { + election.start(event) + } + + // then + assertEquals(1, election.getSize()) + verify { + eventPublisher.publish( + match { it is ElectionStarted && it.transaction == transaction }, + ) + } + } + + @Test + fun `should not duplicate election for same transaction`() { + // given + val transaction = Transaction.sample() + + // when + runBlocking { + election.start(TransactionValidated(account, transaction)) + election.start(TransactionValidated(account, transaction)) + } + + // then + assertEquals(1, election.getSize()) + } + + @Test + fun `should remove election when AccountUpdated event is received`() { + // given + val transaction = Transaction.sample() + + // when + runBlocking { + election.start(TransactionValidated(account, transaction)) + assertEquals(1, election.getSize()) + + val accountUpdated = AccountUpdated(TransactionSource.ELECTION, account, account, transaction) + election.process(accountUpdated) + } + + // then + assertEquals(0, election.getSize()) + } + + @Test + fun `should ignore vote when no election exists`() { + // given + val transaction = Transaction.sample() + val vote = Vote.sample(blockHash = transaction.hash, weight = minimalWeight) + val event = VoteValidated(transaction, vote) + + // when + runBlocking { + election.process(event) + } + + // then + assertEquals(0, election.getSize()) + verify(exactly = 0) { + eventPublisher.publish(match { it is ElectionConsensusReached }) + } + } + + @Test + fun `should reach consensus when vote weight meets threshold`() { + // given + val transaction = Transaction.sample() + val vote = Vote.sample(blockHash = transaction.hash, weight = minimalWeight) + + // when + runBlocking { + election.start(TransactionValidated(account, transaction)) + election.process(VoteValidated(transaction, vote)) + } + + // then + assertEquals(0, election.getSize()) + verify { + eventPublisher.publish( + match { it is ElectionConsensusReached && it.transaction == transaction }, + ) + } + } + + @Test + fun `should publish consensus changed when provisional leader changes`() { + // given + every { voteWeighter.getMinimalConfirmationWeight() } returns AttoAmount(2000UL) + val publicKey = AttoPublicKey(Random.nextBytes(ByteArray(32))) + val transactionA = Transaction.sample(publicKey = publicKey) + val transactionB = Transaction.sample(publicKey = publicKey) + val voteA = Vote.sample(blockHash = transactionA.hash, weight = AttoAmount(100UL)) + val voteB = Vote.sample(blockHash = transactionB.hash, weight = AttoAmount(500UL)) + + // when + runBlocking { + election.start(TransactionValidated(account, transactionA)) + election.start(TransactionValidated(account, transactionB)) + // First make transactionA the definitive leader + election.process(VoteValidated(transactionA, voteA)) + // Then transactionB overtakes + election.process(VoteValidated(transactionB, voteB)) + } + + // then + assertEquals(1, election.getSize()) + verify { + eventPublisher.publish( + match { it is ElectionConsensusChanged && it.transaction == transactionB }, + ) + } + } + + @Test + fun `should expire elections older than expiring threshold`() { + // given + val oldTransaction = Transaction.sample(receivedAt = Instant.now().minusSeconds(600)) + + // when + runBlocking { + election.start(TransactionValidated(account, oldTransaction)) + election.processExpiring() + } + + // then + verify { + eventPublisher.publish( + match { it is ElectionExpiring && it.transaction == oldTransaction }, + ) + } + } + + @Test + fun `should remove staled elections older than expired threshold`() { + // given + val oldTransaction = Transaction.sample(receivedAt = Instant.now().minusSeconds(900)) + + // when + runBlocking { + election.start(TransactionValidated(account, oldTransaction)) + election.stopObservingStaled() + } + + // then + assertEquals(0, election.getSize()) + verify { + eventPublisher.publish( + match { it is ElectionExpired && it.transaction == oldTransaction }, + ) + } + } + + @Test + fun `should clear all elections`() { + // given + val transaction = Transaction.sample() + + // when + runBlocking { + election.start(TransactionValidated(account, transaction)) + assertEquals(1, election.getSize()) + election.clear() + } + + // then + assertEquals(0, election.getSize()) + } + + @Test + fun `should not publish consensus changed when provisional leader stays the same`() { + // given + every { voteWeighter.getMinimalConfirmationWeight() } returns AttoAmount(2000UL) + val transaction = Transaction.sample() + val vote1 = Vote.sample(blockHash = transaction.hash, weight = AttoAmount(500UL)) + val vote2 = Vote.sample(blockHash = transaction.hash, weight = AttoAmount(500UL)) + + // when + runBlocking { + election.start(TransactionValidated(account, transaction)) + election.process(VoteValidated(transaction, vote1)) + election.process(VoteValidated(transaction, vote2)) + } + + // then + assertEquals(1, election.getSize()) + verify(exactly = 0) { + eventPublisher.publish( + match { it is ElectionConsensusChanged }, + ) + } + } + + @Test + fun `should ignore old vote for same voter`() { + // given + every { voteWeighter.getMinimalConfirmationWeight() } returns AttoAmount(2000UL) + val transaction = Transaction.sample() + val voterKey = AttoPublicKey(Random.nextBytes(ByteArray(32))) + val newerVote = + Vote.sample( + blockHash = transaction.hash, + publicKey = voterKey, + weight = AttoAmount(500UL), + timestamp = Instant.now(), + ) + val olderVote = + Vote.sample( + blockHash = transaction.hash, + publicKey = voterKey, + weight = AttoAmount(500UL), + timestamp = Instant.now().minusSeconds(10), + ) + + // when + runBlocking { + election.start(TransactionValidated(account, transaction)) + election.process(VoteValidated(transaction, newerVote)) + election.process(VoteValidated(transaction, olderVote)) + } + + // then + assertEquals(1, election.getSize()) + // No ConsensusChanged event since leader stays the same (single transaction) + verify(exactly = 0) { + eventPublisher.publish( + match { it is ElectionConsensusChanged }, + ) + } + } + + private fun AttoBlock.Companion.sample(publicKey: AttoPublicKey = AttoPublicKey(Random.nextBytes(ByteArray(32)))): AttoBlock = + AttoReceiveBlock( + version = 0U.toAttoVersion(), + network = AttoNetwork.LOCAL, + algorithm = AttoAlgorithm.V1, + publicKey = publicKey, + height = 2U.toAttoHeight(), + balance = AttoAmount.MAX, + timestamp = AttoInstant.now(), + previous = AttoHash(Random.nextBytes(ByteArray(32))), + sendHashAlgorithm = AttoAlgorithm.V1, + sendHash = AttoHash(Random.nextBytes(ByteArray(32))), + ) + + private fun Transaction.Companion.sample( + publicKey: AttoPublicKey = AttoPublicKey(Random.nextBytes(ByteArray(32))), + receivedAt: Instant = Instant.now(), + ): Transaction = + Transaction( + AttoBlock.sample(publicKey = publicKey), + AttoSignature(Random.nextBytes(ByteArray(64))), + AttoWork(Random.nextBytes(ByteArray(8))), + receivedAt = receivedAt, + ) + + private fun Vote.Companion.sample( + blockHash: AttoHash, + weight: AttoAmount, + publicKey: AttoPublicKey = AttoPublicKey(Random.nextBytes(ByteArray(32))), + timestamp: Instant = Instant.now(), + ): Vote = + Vote( + hash = AttoHash(Random.nextBytes(ByteArray(32))), + version = 0U.toAttoVersion(), + algorithm = AttoAlgorithm.V1, + publicKey = publicKey, + blockAlgorithm = AttoAlgorithm.V1, + blockHash = blockHash, + timestamp = timestamp, + signature = AttoSignature(Random.nextBytes(ByteArray(64))), + weight = weight, + ) +}