diff --git a/accountant/accountant-app/src/main/kotlin/co/nilin/opex/accountant/app/controller/AccountantController.kt b/accountant/accountant-app/src/main/kotlin/co/nilin/opex/accountant/app/controller/AccountantController.kt
index b2a1eade6..ae9c8420c 100644
--- a/accountant/accountant-app/src/main/kotlin/co/nilin/opex/accountant/app/controller/AccountantController.kt
+++ b/accountant/accountant-app/src/main/kotlin/co/nilin/opex/accountant/app/controller/AccountantController.kt
@@ -36,8 +36,11 @@ class AccountantController(
val canFulfil = runCatching { walletProxy.canFulfil(currency, "main", uuid, amount) }
.onFailure { logger.error(it.message) }
.getOrElse { false }
- val unprocessed = financialActionLoader.countUnprocessed(uuid, currency, SubmitOrderEvent::class.simpleName!!)
- return BooleanResponse(unprocessed <= 0 && canFulfil)
+ if ( canFulfil ) {
+ val unprocessed = financialActionLoader.countUnprocessed(uuid, currency, SubmitOrderEvent::class.simpleName!!)
+ return BooleanResponse(unprocessed <= 0 )
+ } else
+ return BooleanResponse(false)
}
@GetMapping("/config/{pair}/fee/{direction}-{userLevel}")
diff --git a/wallet/wallet-app/pom.xml b/wallet/wallet-app/pom.xml
index 516dff0e4..73cbb80c9 100644
--- a/wallet/wallet-app/pom.xml
+++ b/wallet/wallet-app/pom.xml
@@ -116,8 +116,47 @@
micrometer-registry-prometheus
runtime
-
+
+ org.testcontainers
+ testcontainers
+ 1.18.0
+ test
+
+
+ org.testcontainers
+ postgresql
+ 1.18.0
+ test
+
+
+ org.testcontainers
+ r2dbc
+ 1.18.0
+ test
+
+
+
+ org.springframework.cloud
+ spring-cloud-stream
+ 3.2.6
+ test-jar
+ test
+ test-binder
+
+
+ org.springframework.kafka
+ spring-kafka-test
+ 2.9.4
+ test
+
+
+ org.testcontainers
+ kafka
+ 1.18.0
+ test
+
+
diff --git a/wallet/wallet-app/src/main/kotlin/co/nilin/opex/wallet/app/utils/PrometheusHealthExtension.kt b/wallet/wallet-app/src/main/kotlin/co/nilin/opex/wallet/app/utils/PrometheusHealthExtension.kt
index 02dc4fcfc..fa08505bd 100644
--- a/wallet/wallet-app/src/main/kotlin/co/nilin/opex/wallet/app/utils/PrometheusHealthExtension.kt
+++ b/wallet/wallet-app/src/main/kotlin/co/nilin/opex/wallet/app/utils/PrometheusHealthExtension.kt
@@ -5,10 +5,12 @@ import io.micrometer.core.instrument.MeterRegistry
import org.springframework.boot.actuate.health.HealthComponent
import org.springframework.boot.actuate.health.HealthEndpoint
import org.springframework.boot.actuate.health.SystemHealth
+import org.springframework.context.annotation.Profile
import org.springframework.scheduling.annotation.Scheduled
import org.springframework.stereotype.Component
@Component
+@Profile("!test")
class PrometheusHealthExtension(
private val registry: MeterRegistry,
private val endpoint: HealthEndpoint
diff --git a/wallet/wallet-app/src/test/kotlin/co/nilin/opex/wallet/app/WalletAppTest.kt b/wallet/wallet-app/src/test/kotlin/co/nilin/opex/wallet/app/WalletAppTest.kt
new file mode 100644
index 000000000..889a1f3b8
--- /dev/null
+++ b/wallet/wallet-app/src/test/kotlin/co/nilin/opex/wallet/app/WalletAppTest.kt
@@ -0,0 +1,22 @@
+package co.nilin.opex.wallet.app
+
+import org.junit.jupiter.api.Test
+import org.springframework.boot.test.context.SpringBootTest
+import org.springframework.cloud.stream.binder.test.TestChannelBinderConfiguration
+import org.springframework.context.annotation.Import
+import org.springframework.kafka.test.context.EmbeddedKafka
+import org.springframework.test.annotation.DirtiesContext
+import org.springframework.test.context.ActiveProfiles
+
+@SpringBootTest
+@DirtiesContext
+@ActiveProfiles("test")
+@Import(TestChannelBinderConfiguration::class)
+@EmbeddedKafka(partitions = 1, brokerProperties = ["listeners=PLAINTEXT://localhost:9092", "port=9092"])
+class WalletAppTest {
+
+ @Test
+ fun contextLoad() {
+
+ }
+}
\ No newline at end of file
diff --git a/wallet/wallet-app/src/test/kotlin/co/nilin/opex/wallet/app/service/TransferManagerImplIT.kt b/wallet/wallet-app/src/test/kotlin/co/nilin/opex/wallet/app/service/TransferManagerImplIT.kt
new file mode 100644
index 000000000..1efd66180
--- /dev/null
+++ b/wallet/wallet-app/src/test/kotlin/co/nilin/opex/wallet/app/service/TransferManagerImplIT.kt
@@ -0,0 +1,209 @@
+package co.nilin.opex.wallet.app.service
+
+import co.nilin.opex.wallet.core.exc.ConcurrentBalanceChangException
+import co.nilin.opex.wallet.core.inout.TransferCommand
+import co.nilin.opex.wallet.core.model.Amount
+import co.nilin.opex.wallet.core.spi.CurrencyService
+import co.nilin.opex.wallet.core.spi.TransferManager
+import co.nilin.opex.wallet.core.spi.WalletManager
+import co.nilin.opex.wallet.core.spi.WalletOwnerManager
+import kotlinx.coroutines.async
+import kotlinx.coroutines.launch
+import kotlinx.coroutines.runBlocking
+import org.junit.jupiter.api.Assertions.*
+import org.junit.jupiter.api.BeforeEach
+import org.junit.jupiter.api.Test
+import org.springframework.beans.factory.annotation.Autowired
+import org.springframework.boot.test.context.SpringBootTest
+import org.springframework.cloud.stream.binder.test.TestChannelBinderConfiguration
+import org.springframework.context.annotation.Import
+import org.springframework.test.annotation.DirtiesContext
+import org.springframework.test.context.ActiveProfiles
+import java.math.BigDecimal
+import java.util.*
+
+@SpringBootTest
+@DirtiesContext
+@ActiveProfiles("test")
+@Import(TestChannelBinderConfiguration::class)
+class TransferManagerImplIT {
+ @Autowired
+ lateinit var transferManager: TransferManager
+
+ @Autowired
+ lateinit var currencyService: CurrencyService
+
+ @Autowired
+ lateinit var walletManager: WalletManager
+
+ @Autowired
+ lateinit var walletOwnerManager: WalletOwnerManager
+
+ val senderWalletType = "main"
+ val receiverWalletType = "exchange"
+ val cc = "CC"
+ val amount = BigDecimal.valueOf(10)
+ var sourceUuid: String? = null
+
+ @BeforeEach
+ fun setup() {
+ sourceUuid = UUID.randomUUID().toString()
+ setupWallets(sourceUuid!!)
+ }
+
+ @Test
+ fun givenSameSenderWallet_whenConcurrentTransfers_thenSecondTransferFail() {
+
+ val block: () -> Unit = {
+ runBlocking {
+ val currency = currencyService.getCurrency(cc)!!
+ val owner = walletOwnerManager.findWalletOwner(sourceUuid!!)
+ val sourceWallet = walletManager.findWalletByOwnerAndCurrencyAndType(owner!!, senderWalletType, currency)
+ val receiverWallet = walletManager.findWalletByOwnerAndCurrencyAndType(owner, receiverWalletType, currency)
+
+ launch {
+ transferManager.transfer(
+ TransferCommand(
+ sourceWallet!!,
+ receiverWallet!!,
+ Amount(sourceWallet.currency, amount),
+ "Amount1 ${System.currentTimeMillis()}", "Ref1 ${System.currentTimeMillis()}", emptyMap()
+ )
+ )
+ }
+ launch {
+ transferManager.transfer(
+ TransferCommand(
+ sourceWallet!!,
+ receiverWallet!!,
+ Amount(sourceWallet.currency, amount),
+ "Amount2 ${System.currentTimeMillis()}", "Ref2 ${System.currentTimeMillis()}", emptyMap()
+ )
+ )
+ }
+ }
+ }
+ try {
+ block.invoke()
+ } catch (_: ConcurrentBalanceChangException) {
+
+ }
+ runBlocking {
+ val currency = currencyService.getCurrency(cc)!!
+ val owner = walletOwnerManager.findWalletOwner(sourceUuid!!)
+ val sourceWallet = walletManager.findWalletByOwnerAndCurrencyAndType(owner!!, senderWalletType, currency)
+ val receiverWallet = walletManager.findWalletByOwnerAndCurrencyAndType(owner, receiverWalletType, currency)
+
+ assertEquals(amount, sourceWallet!!.balance.amount)
+ assertEquals(amount, receiverWallet!!.balance.amount)
+ }
+ }
+
+ @Test
+ fun givenSameReceiverWallet_whenConcurrentTransfers_thenTransfersSuccess() {
+ runBlocking {
+ val currency = currencyService.getCurrency(cc)!!
+ val owner = walletOwnerManager.findWalletOwner(sourceUuid!!)
+ val receiverWallet = walletManager.findWalletByOwnerAndCurrencyAndType(owner!!, receiverWalletType, currency)
+
+ val source2Uuid = UUID.randomUUID().toString()
+ setupWallets(source2Uuid)
+ val sourceOwner2 = walletOwnerManager.findWalletOwner(source2Uuid)
+
+ val t1 = async {
+ val sourceWallet1 = walletManager.findWalletByOwnerAndCurrencyAndType(owner, senderWalletType, currency)
+ transferManager.transfer(
+ TransferCommand(
+ sourceWallet1!!,
+ receiverWallet!!,
+ Amount(sourceWallet1.currency, amount),
+ "Amount1 ${System.currentTimeMillis()}", "Ref1 ${System.currentTimeMillis()}", emptyMap()
+ )
+ )
+ }
+ val t2 = async {
+ val sourceWallet2 = walletManager.findWalletByOwnerAndCurrencyAndType(sourceOwner2!!, senderWalletType, currency)
+ transferManager.transfer(
+ TransferCommand(
+ sourceWallet2!!,
+ receiverWallet!!,
+ Amount(sourceWallet2.currency, amount),
+ "Amount2 ${System.currentTimeMillis()}", "Ref2 ${System.currentTimeMillis()}", emptyMap()
+ )
+ )
+ }
+ t1.await()
+ t2.await()
+
+ val sourceWallet1Refresh = walletManager.findWalletByOwnerAndCurrencyAndType(owner, senderWalletType, currency)
+ val sourceWallet2Refresh = walletManager.findWalletByOwnerAndCurrencyAndType(sourceOwner2!!, senderWalletType, currency)
+ val receiverWalletRefresh = walletManager.findWalletByOwnerAndCurrencyAndType(owner, receiverWalletType, currency)
+
+ assertEquals(amount, sourceWallet1Refresh!!.balance.amount)
+ assertEquals(amount, sourceWallet2Refresh!!.balance.amount)
+ assertEquals(amount.plus(amount), receiverWalletRefresh!!.balance.amount)
+ }
+
+
+ }
+
+ @Test
+ fun givenSameSenderWallet_whenSequentialTransfers_thenTransfersSuccess() {
+ runBlocking {
+ val currency = currencyService.getCurrency(cc)!!
+ val owner = walletOwnerManager.findWalletOwner(sourceUuid!!)
+
+ async {
+ val sourceWallet = walletManager.findWalletByOwnerAndCurrencyAndType(owner!!, senderWalletType, currency)
+ val receiverWallet = walletManager.findWalletByOwnerAndCurrencyAndType(owner, receiverWalletType, currency)
+
+ transferManager.transfer(
+ TransferCommand(
+ sourceWallet!!,
+ receiverWallet!!,
+ Amount(sourceWallet.currency, amount),
+ "Amount1 ${System.currentTimeMillis()}", "Ref1 ${System.currentTimeMillis()}", emptyMap()
+ )
+ )
+ }.await()
+ async {
+ val sourceWallet = walletManager.findWalletByOwnerAndCurrencyAndType(owner!!, senderWalletType, currency)
+ val receiverWallet = walletManager.findWalletByOwnerAndCurrencyAndType(owner, receiverWalletType, currency)
+
+ transferManager.transfer(
+ TransferCommand(
+ sourceWallet!!,
+ receiverWallet!!,
+ Amount(sourceWallet!!.currency, amount),
+ "Amount2 ${System.currentTimeMillis()}", "Ref2 ${System.currentTimeMillis()}", emptyMap()
+ )
+ )
+ }.await()
+ val sourceWallet = walletManager.findWalletByOwnerAndCurrencyAndType(owner!!, senderWalletType, currency)
+ val receiverWallet = walletManager.findWalletByOwnerAndCurrencyAndType(owner, receiverWalletType, currency)
+
+ assertEquals(BigDecimal.ZERO, sourceWallet!!.balance.amount)
+ assertEquals(amount.plus(amount), receiverWallet!!.balance.amount)
+ }
+ }
+
+ fun setupWallets(sourceUuid: String) {
+ runBlocking {
+ var currency = currencyService.getCurrency(cc)
+ if (currency == null) {
+ currencyService.deleteCurrency(cc)
+ currencyService.addCurrency(cc, cc, BigDecimal.ONE)
+ currency = currencyService.getCurrency(cc)
+ }
+ val sourceOwner = walletOwnerManager.createWalletOwner(sourceUuid, "not set", "")
+ walletManager.createWallet(sourceOwner, Amount(currency!!, amount.multiply(BigDecimal.valueOf(2))), currency, senderWalletType)
+ walletManager.createWallet(
+ sourceOwner,
+ Amount(currency, BigDecimal.ZERO),
+ currency,
+ receiverWalletType
+ )
+ }
+ }
+}
+
diff --git a/wallet/wallet-app/src/test/resources/application.yml b/wallet/wallet-app/src/test/resources/application.yml
new file mode 100644
index 000000000..8246bdf42
--- /dev/null
+++ b/wallet/wallet-app/src/test/resources/application.yml
@@ -0,0 +1,58 @@
+server.port: 8080
+management:
+ health:
+ vault:
+ enabled: false
+ endpoints:
+ web:
+ base-path: /actuator
+ exposure:
+ include: ["health", "metrics"]
+ endpoint:
+ health:
+ show-details: always
+ metrics:
+ enabled: true
+ prometheus:
+ enabled: false
+spring:
+ jackson:
+ serialization:
+ write-dates-as-timestamps: false
+ application:
+ name: opex-wallet
+ main:
+ allow-bean-definition-overriding: false
+ allow-circular-references: true
+ kafka:
+ bootstrap-servers: ${KAFKA_IP_PORT:localhost:9092}
+ consumer:
+ auto-offset-reset: earliest
+ group-id: wallet
+ r2dbc:
+ url: r2dbc:tc:postgresql:///databasename?TC_IMAGE_TAG=9.6.8
+ initialization-mode: always
+ cloud:
+ bootstrap:
+ enabled: true
+ discovery:
+ enabled: false
+ consul:
+ enabled: false
+ config:
+ enabled: false
+ vault:
+ enabled: false
+app:
+ auth:
+ cert-url: none
+ system:
+ uuid: 1
+logging:
+ level:
+ org.apache.kafka: ERROR
+ co.nilin: DEBUG
+ reactor.netty.http.client: DEBUG
+swagger.authUrl: ${SWAGGER_AUTH_URL:https://api.opex.dev/auth}/realms/opex/protocol/openid-connect/token
+test:
+ topic: embedded-test-topic
\ No newline at end of file
diff --git a/wallet/wallet-core/src/main/kotlin/co/nilin/opex/wallet/core/exc/ConcurrentBalanceChangException.kt b/wallet/wallet-core/src/main/kotlin/co/nilin/opex/wallet/core/exc/ConcurrentBalanceChangException.kt
new file mode 100644
index 000000000..3d10733dd
--- /dev/null
+++ b/wallet/wallet-core/src/main/kotlin/co/nilin/opex/wallet/core/exc/ConcurrentBalanceChangException.kt
@@ -0,0 +1,3 @@
+package co.nilin.opex.wallet.core.exc
+
+class ConcurrentBalanceChangException(override val message: String?): Exception()
\ No newline at end of file
diff --git a/wallet/wallet-core/src/main/kotlin/co/nilin/opex/wallet/core/model/Wallet.kt b/wallet/wallet-core/src/main/kotlin/co/nilin/opex/wallet/core/model/Wallet.kt
index 932e492ab..2d2f6eacf 100644
--- a/wallet/wallet-core/src/main/kotlin/co/nilin/opex/wallet/core/model/Wallet.kt
+++ b/wallet/wallet-core/src/main/kotlin/co/nilin/opex/wallet/core/model/Wallet.kt
@@ -5,5 +5,6 @@ data class Wallet(
val owner: WalletOwner,
val balance: Amount,
val currency: Currency,
- val type: String
+ val type: String,
+ val version: Long?
)
\ No newline at end of file
diff --git a/wallet/wallet-core/src/test/kotlin/co/nilin/opex/wallet/core/service/sample/Samples.kt b/wallet/wallet-core/src/test/kotlin/co/nilin/opex/wallet/core/service/sample/Samples.kt
index 4d8ee6cde..18bc3851d 100644
--- a/wallet/wallet-core/src/test/kotlin/co/nilin/opex/wallet/core/service/sample/Samples.kt
+++ b/wallet/wallet-core/src/test/kotlin/co/nilin/opex/wallet/core/service/sample/Samples.kt
@@ -34,6 +34,7 @@ object VALID {
Amount(CURRENCY, BigDecimal.valueOf(1.5)),
CURRENCY,
WALLET_TYPE_MAIN
+ , 0
)
val DEST_WALLET_OWNER = SOURCE_WALLET_OWNER.copy(2, "e1950578-ef22-44e4-89f5-0b78feb03e2a")
diff --git a/wallet/wallet-ports/wallet-persister-postgres/src/main/kotlin/co/nilin/opex/wallet/ports/postgres/dao/WalletRepository.kt b/wallet/wallet-ports/wallet-persister-postgres/src/main/kotlin/co/nilin/opex/wallet/ports/postgres/dao/WalletRepository.kt
index f46bb9228..20fd3be54 100644
--- a/wallet/wallet-ports/wallet-persister-postgres/src/main/kotlin/co/nilin/opex/wallet/ports/postgres/dao/WalletRepository.kt
+++ b/wallet/wallet-ports/wallet-persister-postgres/src/main/kotlin/co/nilin/opex/wallet/ports/postgres/dao/WalletRepository.kt
@@ -33,6 +33,10 @@ interface WalletRepository : ReactiveCrudRepository {
fun findByOwnerAndCurrency(owner: Long, currency: String): Flux
@Modifying
- @Query("update wallet set balance = balance + :balance where id = :id")
+ @Query("update wallet set balance = balance + :balance, version = version + 1 where id = :id and version = :version")
+ fun updateBalance(id: Long, delta: BigDecimal, version: Long): Mono
+
+ @Modifying
+ @Query("update wallet set balance = balance + :balance, version = version + 1 where id = :id")
fun updateBalance(id: Long, delta: BigDecimal): Mono
}
\ No newline at end of file
diff --git a/wallet/wallet-ports/wallet-persister-postgres/src/main/kotlin/co/nilin/opex/wallet/ports/postgres/dto/WalletDto.kt b/wallet/wallet-ports/wallet-persister-postgres/src/main/kotlin/co/nilin/opex/wallet/ports/postgres/dto/WalletDto.kt
index 0b9d8e7c4..daf126800 100644
--- a/wallet/wallet-ports/wallet-persister-postgres/src/main/kotlin/co/nilin/opex/wallet/ports/postgres/dto/WalletDto.kt
+++ b/wallet/wallet-ports/wallet-persister-postgres/src/main/kotlin/co/nilin/opex/wallet/ports/postgres/dto/WalletDto.kt
@@ -11,7 +11,8 @@ fun WalletModel.toPlainObject(walletOwner: WalletOwner, currency: Currency) = Wa
walletOwner,
Amount(currency, balance),
currency,
- type
+ type,
+ version
)
fun Wallet.toModel() = WalletModel(
diff --git a/wallet/wallet-ports/wallet-persister-postgres/src/main/kotlin/co/nilin/opex/wallet/ports/postgres/impl/WalletManagerImpl.kt b/wallet/wallet-ports/wallet-persister-postgres/src/main/kotlin/co/nilin/opex/wallet/ports/postgres/impl/WalletManagerImpl.kt
index 1979ae670..1e752351a 100644
--- a/wallet/wallet-ports/wallet-persister-postgres/src/main/kotlin/co/nilin/opex/wallet/ports/postgres/impl/WalletManagerImpl.kt
+++ b/wallet/wallet-ports/wallet-persister-postgres/src/main/kotlin/co/nilin/opex/wallet/ports/postgres/impl/WalletManagerImpl.kt
@@ -1,5 +1,6 @@
package co.nilin.opex.wallet.ports.postgres.impl
+import co.nilin.opex.wallet.core.exc.ConcurrentBalanceChangException
import co.nilin.opex.wallet.core.model.Amount
import co.nilin.opex.wallet.core.model.Currency
import co.nilin.opex.wallet.core.model.Wallet
@@ -11,6 +12,7 @@ import co.nilin.opex.wallet.ports.postgres.model.WalletModel
import kotlinx.coroutines.reactive.awaitFirst
import kotlinx.coroutines.reactive.awaitFirstOrNull
import kotlinx.coroutines.reactive.awaitSingle
+import org.slf4j.LoggerFactory
import org.springframework.stereotype.Service
import java.math.BigDecimal
import java.time.LocalDateTime
@@ -24,6 +26,9 @@ class WalletManagerImpl(
val currencyRepository: CurrencyRepository
) : WalletManager {
+ val logger = LoggerFactory.getLogger(WalletManagerImpl::class.java)
+
+
override suspend fun isDepositAllowed(wallet: Wallet, amount: BigDecimal): Boolean {
var limit = walletLimitsRepository.findByOwnerAndCurrencyAndWalletAndAction(
wallet.owner.id!!, wallet.currency.symbol, wallet.id!!, "deposit"
@@ -119,14 +124,20 @@ class WalletManagerImpl(
override suspend fun increaseBalance(wallet: Wallet, amount: BigDecimal) {
require(amount >= BigDecimal.ZERO)
+ logger.info("Increase balance {}, {}, {}, {}", wallet.id, wallet.balance, amount, wallet.version)
val updateCount = walletRepository.updateBalance(wallet.id!!, amount).awaitFirst()
- assert(updateCount == 1) { "Decrease wallet balance failed" }
+ if (updateCount != 1) {
+ throw ConcurrentBalanceChangException("Increase wallet balance failed")
+ }
}
override suspend fun decreaseBalance(wallet: Wallet, amount: BigDecimal) {
require(amount >= BigDecimal.ZERO)
- val updateCount = walletRepository.updateBalance(wallet.id!!, -amount).awaitFirst()
- assert(updateCount == 1) { "Decrease wallet balance failed" }
+ logger.info("Decrease balance {}, {}, {}, {}", wallet.id, wallet.balance, amount, wallet.version)
+ val updateCount = walletRepository.updateBalance(wallet.id!!, -amount, wallet.version!!).awaitFirst()
+ if (updateCount != 1) {
+ throw ConcurrentBalanceChangException("Decrease wallet balance failed")
+ }
}
override suspend fun findWalletByOwnerAndCurrencyAndType(
@@ -147,7 +158,8 @@ class WalletManagerImpl(
walletOwner,
Amount(existingCurrency.toPlainObject(), walletModel.balance),
existingCurrency.toPlainObject(),
- walletModel.type
+ walletModel.type,
+ walletModel.version
)
}
@@ -163,7 +175,8 @@ class WalletManagerImpl(
ownerModel.toPlainObject(),
Amount(currency.toPlainObject(), it.balance),
currency.toPlainObject(),
- it.type
+ it.type,
+ it.version
)
}
}
@@ -180,7 +193,8 @@ class WalletManagerImpl(
ownerModel.toPlainObject(),
Amount(currency.toPlainObject(), it.balance),
currency.toPlainObject(),
- it.type
+ it.type,
+ it.version
)
}
}
@@ -197,7 +211,8 @@ class WalletManagerImpl(
ownerModel.toPlainObject(),
Amount(currency.toPlainObject(), it.balance),
currency.toPlainObject(),
- it.type
+ it.type,
+ it.version
)
}
}
@@ -211,7 +226,8 @@ class WalletManagerImpl(
owner,
Amount(currency, walletModel.balance),
currency,
- walletModel.type
+ walletModel.type,
+ walletModel.version
)
return wallet
@@ -229,7 +245,8 @@ class WalletManagerImpl(
walletOwnerRepository.findById(walletModel.owner).awaitFirst().toPlainObject(),
Amount(existingCurrency.toPlainObject(), walletModel.balance),
existingCurrency.toPlainObject(),
- walletModel.type
+ walletModel.type,
+ walletModel.version
)
}
}
diff --git a/wallet/wallet-ports/wallet-persister-postgres/src/main/kotlin/co/nilin/opex/wallet/ports/postgres/model/WalletModel.kt b/wallet/wallet-ports/wallet-persister-postgres/src/main/kotlin/co/nilin/opex/wallet/ports/postgres/model/WalletModel.kt
index 5f551f132..a3a47f2fb 100644
--- a/wallet/wallet-ports/wallet-persister-postgres/src/main/kotlin/co/nilin/opex/wallet/ports/postgres/model/WalletModel.kt
+++ b/wallet/wallet-ports/wallet-persister-postgres/src/main/kotlin/co/nilin/opex/wallet/ports/postgres/model/WalletModel.kt
@@ -1,6 +1,7 @@
package co.nilin.opex.wallet.ports.postgres.model
import org.springframework.data.annotation.Id
+import org.springframework.data.annotation.Version
import org.springframework.data.relational.core.mapping.Column
import org.springframework.data.relational.core.mapping.Table
import java.math.BigDecimal
@@ -11,5 +12,7 @@ data class WalletModel(
@Column("owner") val owner: Long,
@Column("wallet_type") val type: String,
@Column("currency") val currency: String,
- @Column("balance") val balance: BigDecimal
+ @Column("balance") val balance: BigDecimal,
+ @Version
+ var version: Long? = null
)
\ No newline at end of file
diff --git a/wallet/wallet-ports/wallet-persister-postgres/src/main/resources/schema.sql b/wallet/wallet-ports/wallet-persister-postgres/src/main/resources/schema.sql
index be2d04f4d..77a6766d8 100644
--- a/wallet/wallet-ports/wallet-persister-postgres/src/main/resources/schema.sql
+++ b/wallet/wallet-ports/wallet-persister-postgres/src/main/resources/schema.sql
@@ -26,6 +26,8 @@ CREATE TABLE IF NOT EXISTS wallet
UNIQUE (owner, wallet_type, currency)
);
+ALTER TABLE wallet ADD COLUMN IF NOT EXISTS version INTEGER;
+
CREATE TABLE IF NOT EXISTS transaction
(
id SERIAL PRIMARY KEY,
diff --git a/wallet/wallet-ports/wallet-persister-postgres/src/test/kotlin/co/nilin/opex/wallet/ports/postgres/impl/WalletManagerTest.kt b/wallet/wallet-ports/wallet-persister-postgres/src/test/kotlin/co/nilin/opex/wallet/ports/postgres/impl/WalletManagerTest.kt
index dbf26fe37..97de3e17c 100644
--- a/wallet/wallet-ports/wallet-persister-postgres/src/test/kotlin/co/nilin/opex/wallet/ports/postgres/impl/WalletManagerTest.kt
+++ b/wallet/wallet-ports/wallet-persister-postgres/src/test/kotlin/co/nilin/opex/wallet/ports/postgres/impl/WalletManagerTest.kt
@@ -382,7 +382,7 @@ private class WalletManagerTest {
@Test
fun givenWrongAmount_whenIncreaseBalance_thenThrow(): Unit = runBlocking {
every {
- walletRepository.updateBalance(eq(20), any())
+ walletRepository.updateBalance(eq(20), any(), any())
} returns Mono.just(0)
assertThatThrownBy {
@@ -398,7 +398,7 @@ private class WalletManagerTest {
@Test
fun givenWallet_whenDecreaseBalance_thenSuccess(): Unit = runBlocking {
every {
- walletRepository.updateBalance(eq(VALID.WALLET.id!!), eq(BigDecimal.valueOf(-1)))
+ walletRepository.updateBalance(eq(VALID.WALLET.id!!), eq(BigDecimal.valueOf(-1)), any())
} returns Mono.just(1)
assertThatNoException().isThrownBy {
@@ -414,7 +414,7 @@ private class WalletManagerTest {
@Test
fun givenNoWallet_whenDecreaseBalance_thenThrow(): Unit = runBlocking {
every {
- walletRepository.updateBalance(any(), eq(BigDecimal.valueOf(-1)))
+ walletRepository.updateBalance(any(), eq(BigDecimal.valueOf(-1)), any())
} returns Mono.just(0)
assertThatThrownBy {
@@ -430,7 +430,7 @@ private class WalletManagerTest {
@Test
fun givenWrongAmount_whenDecreaseBalance_thenThrow(): Unit = runBlocking {
every {
- walletRepository.updateBalance(eq(VALID.WALLET_OWNER.id!!), eq(BigDecimal.valueOf(-1)))
+ walletRepository.updateBalance(eq(VALID.WALLET_OWNER.id!!), eq(BigDecimal.valueOf(-1)), any())
} returns Mono.just(0)
assertThatThrownBy {
diff --git a/wallet/wallet-ports/wallet-persister-postgres/src/test/kotlin/co/nilin/opex/wallet/ports/postgres/impl/sample/Samples.kt b/wallet/wallet-ports/wallet-persister-postgres/src/test/kotlin/co/nilin/opex/wallet/ports/postgres/impl/sample/Samples.kt
index a250f5c62..e4c97e916 100644
--- a/wallet/wallet-ports/wallet-persister-postgres/src/test/kotlin/co/nilin/opex/wallet/ports/postgres/impl/sample/Samples.kt
+++ b/wallet/wallet-ports/wallet-persister-postgres/src/test/kotlin/co/nilin/opex/wallet/ports/postgres/impl/sample/Samples.kt
@@ -33,7 +33,8 @@ object VALID {
WALLET_OWNER,
Amount(CURRENCY, BigDecimal.valueOf(1.5)),
CURRENCY,
- WALLET_TYPE_MAIN
+ WALLET_TYPE_MAIN,
+ 0
)
val WALLET_LIMITS_MODEL_WITHDRAW = WalletLimitsModel(