diff --git a/accountant/accountant-core/src/main/kotlin/co/nilin/opex/accountant/core/inout/TransferRequest.kt b/accountant/accountant-core/src/main/kotlin/co/nilin/opex/accountant/core/inout/TransferRequest.kt new file mode 100644 index 000000000..5203b756f --- /dev/null +++ b/accountant/accountant-core/src/main/kotlin/co/nilin/opex/accountant/core/inout/TransferRequest.kt @@ -0,0 +1,14 @@ +package co.nilin.opex.accountant.core.inout + +import java.math.BigDecimal + +data class TransferRequest( + val amount: BigDecimal, + val symbol: String, + val senderUuid: String, + val senderWalletType: String, + val receiverUuid: String, + val receiverWalletType: String, + val transferRef: String?, + val description: String? +) \ No newline at end of file diff --git a/accountant/accountant-core/src/main/kotlin/co/nilin/opex/accountant/core/service/FinancialActionJobManagerImpl.kt b/accountant/accountant-core/src/main/kotlin/co/nilin/opex/accountant/core/service/FinancialActionJobManagerImpl.kt index cee6db269..944bb4a22 100644 --- a/accountant/accountant-core/src/main/kotlin/co/nilin/opex/accountant/core/service/FinancialActionJobManagerImpl.kt +++ b/accountant/accountant-core/src/main/kotlin/co/nilin/opex/accountant/core/service/FinancialActionJobManagerImpl.kt @@ -1,6 +1,8 @@ package co.nilin.opex.accountant.core.service import co.nilin.opex.accountant.core.api.FinancialActionJobManager +import co.nilin.opex.accountant.core.inout.TransferRequest +import co.nilin.opex.accountant.core.model.FinancialAction import co.nilin.opex.accountant.core.model.FinancialActionStatus import co.nilin.opex.accountant.core.spi.FinancialActionLoader import co.nilin.opex.accountant.core.spi.FinancialActionPersister @@ -13,31 +15,44 @@ class FinancialActionJobManagerImpl( private val walletProxy: WalletProxy ) : FinancialActionJobManager { - private val retryLimit = 10 - private val log = LoggerFactory.getLogger(FinancialActionJobManagerImpl::class.java) + private val logger = LoggerFactory.getLogger(FinancialActionJobManagerImpl::class.java) override suspend fun processFinancialActions(offset: Long, size: Long) { val factions = financialActionLoader.loadUnprocessed(offset, size) - factions.forEach { - try { - walletProxy.transfer( + val flatten = sortAndFlattenFA(factions) + logger.info("Loaded ${flatten.size} factions: ${flatten.map { it.id }}") + if (factions.isEmpty()) + return + + try { + val requests = factions.map { + TransferRequest( + it.amount, it.symbol, - it.senderWalletType, it.sender, - it.receiverWalletType, + it.senderWalletType, it.receiver, - it.amount, - it.eventType + it.pointer, - null - ) - financialActionPersister.updateStatus(it, FinancialActionStatus.PROCESSED) - } catch (e: Exception) { - log.error("financial job error", e) - financialActionPersister.updateStatus( - it, - if (it.retryCount >= retryLimit) FinancialActionStatus.ERROR else FinancialActionStatus.CREATED + it.receiverWalletType, + null, + it.eventType + it.pointer ) } + walletProxy.batchTransfer(requests) + financialActionPersister.updateBatchStatus(factions, FinancialActionStatus.PROCESSED) + } catch (e: Exception) { + logger.error("financial job error", e) + } + } + + fun sortAndFlattenFA(list: List): Collection { + val result = arrayListOf() + + fun extractParent(fa: FinancialAction) { + if (fa.parent != null) + extractParent(fa.parent) + result.add(fa) } + list.forEach { extractParent(it) } + return result.distinctBy { it.id } } } \ No newline at end of file diff --git a/accountant/accountant-core/src/main/kotlin/co/nilin/opex/accountant/core/spi/FinancialActionPersister.kt b/accountant/accountant-core/src/main/kotlin/co/nilin/opex/accountant/core/spi/FinancialActionPersister.kt index 53d0db4ff..6b66dc4c3 100644 --- a/accountant/accountant-core/src/main/kotlin/co/nilin/opex/accountant/core/spi/FinancialActionPersister.kt +++ b/accountant/accountant-core/src/main/kotlin/co/nilin/opex/accountant/core/spi/FinancialActionPersister.kt @@ -8,4 +8,6 @@ interface FinancialActionPersister { suspend fun persist(financialActions: List): List suspend fun updateStatus(financialAction: FinancialAction, status: FinancialActionStatus) + + suspend fun updateBatchStatus(financialAction: List, status: FinancialActionStatus) } \ No newline at end of file diff --git a/accountant/accountant-core/src/main/kotlin/co/nilin/opex/accountant/core/spi/WalletProxy.kt b/accountant/accountant-core/src/main/kotlin/co/nilin/opex/accountant/core/spi/WalletProxy.kt index ed27996ca..a5ab84a65 100644 --- a/accountant/accountant-core/src/main/kotlin/co/nilin/opex/accountant/core/spi/WalletProxy.kt +++ b/accountant/accountant-core/src/main/kotlin/co/nilin/opex/accountant/core/spi/WalletProxy.kt @@ -1,5 +1,6 @@ package co.nilin.opex.accountant.core.spi +import co.nilin.opex.accountant.core.inout.TransferRequest import java.math.BigDecimal interface WalletProxy { @@ -15,5 +16,7 @@ interface WalletProxy { transferRef: String? ) + suspend fun batchTransfer(transfers: List) + suspend fun canFulfil(symbol: String, walletType: String, uuid: String, amount: BigDecimal): Boolean } \ No newline at end of file diff --git a/accountant/accountant-core/src/test/kotlin/co/nilin/opex/accountant/core/service/FAJobManagerImplTest.kt b/accountant/accountant-core/src/test/kotlin/co/nilin/opex/accountant/core/service/FAJobManagerImplTest.kt index 3318eac7d..d2ba203c6 100644 --- a/accountant/accountant-core/src/test/kotlin/co/nilin/opex/accountant/core/service/FAJobManagerImplTest.kt +++ b/accountant/accountant-core/src/test/kotlin/co/nilin/opex/accountant/core/service/FAJobManagerImplTest.kt @@ -1,6 +1,6 @@ package co.nilin.opex.accountant.core.service -import co.nilin.opex.accountant.core.model.FinancialActionStatus +import co.nilin.opex.accountant.core.model.FinancialAction import co.nilin.opex.accountant.core.spi.FinancialActionLoader import co.nilin.opex.accountant.core.spi.FinancialActionPersister import co.nilin.opex.accountant.core.spi.WalletProxy @@ -9,6 +9,9 @@ import io.mockk.coVerify import io.mockk.mockk import kotlinx.coroutines.runBlocking import org.junit.jupiter.api.Test +import java.math.BigDecimal +import java.time.LocalDateTime +import org.assertj.core.api.Assertions.assertThat class FAJobManagerImplTest { @@ -20,93 +23,36 @@ class FAJobManagerImplTest { init { coEvery { financialActionLoader.loadUnprocessed(any(), any()) } returns listOf(Valid.fa, Valid.fa) - coEvery { financialActionPersister.updateStatus(any(), any()) } returns Unit - } - - @Test - fun given2FALoaded_whenProcessing_thenVerifyThatTransferProxyCalled2Times() = runBlocking { - coEvery { walletProxy.transfer(any(), any(), any(), any(), any(), any(), any(), any()) } returns Unit - sut.processFinancialActions(0, 2) - with(Valid.fa) { - coVerify(exactly = 2) { - walletProxy.transfer( - eq(symbol), - eq(senderWalletType), - eq(sender), - eq(receiverWalletType), - eq(receiver), - eq(amount), - eq(eventType + pointer), - any() - ) - } - coVerify(exactly = 2) { - financialActionPersister.updateStatus( - eq(this@with), - eq(FinancialActionStatus.PROCESSED) - ) - } - } + coEvery { financialActionPersister.updateBatchStatus(any(), any()) } returns Unit } @Test fun given2FALoaded_whenProcessingFailed_thenUpdateStatusCalledRegardless() = runBlocking { coEvery { - walletProxy.transfer(any(), any(), any(), any(), any(), any(), any(), any()) + walletProxy.batchTransfer(any()) } throws IllegalStateException() sut.processFinancialActions(0, 2) - with(Valid.fa) { - coVerify(exactly = 2) { - walletProxy.transfer( - eq(symbol), - eq(senderWalletType), - eq(sender), - eq(receiverWalletType), - eq(receiver), - eq(amount), - eq(eventType + pointer), - any() - ) - } - coVerify(exactly = 2) { - financialActionPersister.updateStatus( - eq(this@with), - eq(FinancialActionStatus.CREATED) - ) - } + coVerify(exactly = 1) { + walletProxy.batchTransfer(any()) } } @Test - fun given2FALoaded_whenProcessingFailedAndRetryCountExceeded_thenUpdateStatusCalledRegardless() = runBlocking { - coEvery { - walletProxy.transfer(any(), any(), any(), any(), any(), any(), any(), any()) - } throws IllegalStateException() - - coEvery { financialActionLoader.loadUnprocessed(any(), any()) } returns listOf(Valid.faHighRetry) - - sut.processFinancialActions(0, 1) - with(Valid.faHighRetry) { - coVerify(exactly = 1) { - walletProxy.transfer( - eq(symbol), - eq(senderWalletType), - eq(sender), - eq(receiverWalletType), - eq(receiver), - eq(amount), - eq(eventType + pointer), - any() - ) - } - coVerify(exactly = 1) { - financialActionPersister.updateStatus( - eq(this@with), - eq(FinancialActionStatus.ERROR) - ) - } - } + fun givenFALoaded_validateParentsAreFirstInLine(): Unit = runBlocking { + val fa1 = FinancialAction(null, "", "", "", BigDecimal.ZERO, "", "", "", "", LocalDateTime.now(), id = 1) + val fa2 = FinancialAction(fa1, "", "", "", BigDecimal.ZERO, "", "", "", "", LocalDateTime.now(), id = 2) + val fa3 = FinancialAction(fa1, "", "", "", BigDecimal.ZERO, "", "", "", "", LocalDateTime.now(), id = 3) + val fa4 = FinancialAction(fa3, "", "", "", BigDecimal.ZERO, "", "", "", "", LocalDateTime.now(), id = 4) + val fa5 = FinancialAction(null, "", "", "", BigDecimal.ZERO, "", "", "", "", LocalDateTime.now(), id = 5) + val list = arrayListOf(fa5, fa4, fa3, fa2, fa1) + + val flatten = sut.sortAndFlattenFA(list) + + assertThat(flatten.indexOf(fa1)).isLessThan(flatten.indexOf(fa2)) + assertThat(flatten.indexOf(fa1)).isLessThan(flatten.indexOf(fa3)) + assertThat(flatten.indexOf(fa1)).isLessThan(flatten.indexOf(fa4)) + assertThat(flatten.indexOf(fa3)).isLessThan(flatten.indexOf(fa4)) } } \ No newline at end of file diff --git a/accountant/accountant-ports/accountant-persister-postgres/pom.xml b/accountant/accountant-ports/accountant-persister-postgres/pom.xml index 331727e89..8d0ea453d 100644 --- a/accountant/accountant-ports/accountant-persister-postgres/pom.xml +++ b/accountant/accountant-ports/accountant-persister-postgres/pom.xml @@ -33,7 +33,7 @@ spring-boot-starter-data-r2dbc - io.r2dbc + org.postgresql r2dbc-postgresql runtime diff --git a/accountant/accountant-ports/accountant-persister-postgres/src/main/kotlin/co/nilin/opex/accountant/ports/postgres/dao/FinancialActionRepository.kt b/accountant/accountant-ports/accountant-persister-postgres/src/main/kotlin/co/nilin/opex/accountant/ports/postgres/dao/FinancialActionRepository.kt index 1090e0859..4ef86a1fa 100644 --- a/accountant/accountant-ports/accountant-persister-postgres/src/main/kotlin/co/nilin/opex/accountant/ports/postgres/dao/FinancialActionRepository.kt +++ b/accountant/accountant-ports/accountant-persister-postgres/src/main/kotlin/co/nilin/opex/accountant/ports/postgres/dao/FinancialActionRepository.kt @@ -37,4 +37,7 @@ interface FinancialActionRepository : ReactiveCrudRepository + + @Query("update fi_actions set status = :status where id in (:ids)") + fun updateStatus(ids: List, status: FinancialActionStatus): Mono } \ No newline at end of file diff --git a/accountant/accountant-ports/accountant-persister-postgres/src/main/kotlin/co/nilin/opex/accountant/ports/postgres/impl/FinancialActionPersisterImpl.kt b/accountant/accountant-ports/accountant-persister-postgres/src/main/kotlin/co/nilin/opex/accountant/ports/postgres/impl/FinancialActionPersisterImpl.kt index fb82e954d..7cb5bb08a 100644 --- a/accountant/accountant-ports/accountant-persister-postgres/src/main/kotlin/co/nilin/opex/accountant/ports/postgres/impl/FinancialActionPersisterImpl.kt +++ b/accountant/accountant-ports/accountant-persister-postgres/src/main/kotlin/co/nilin/opex/accountant/ports/postgres/impl/FinancialActionPersisterImpl.kt @@ -37,4 +37,8 @@ class FinancialActionPersisterImpl(private val financialActionRepository: Financ override suspend fun updateStatus(financialAction: FinancialAction, status: FinancialActionStatus) { financialActionRepository.updateStatusAndIncreaseRetry(financialAction.id!!, status).awaitFirstOrNull() } + + override suspend fun updateBatchStatus(financialAction: List, status: FinancialActionStatus) { + financialActionRepository.updateStatus(financialAction.mapNotNull { it.id }, status).awaitFirstOrNull() + } } \ No newline at end of file diff --git a/accountant/accountant-ports/accountant-wallet-proxy/pom.xml b/accountant/accountant-ports/accountant-wallet-proxy/pom.xml index 7c3460d87..bb691dafd 100644 --- a/accountant/accountant-ports/accountant-wallet-proxy/pom.xml +++ b/accountant/accountant-ports/accountant-wallet-proxy/pom.xml @@ -45,7 +45,7 @@ spring-boot-starter-data-r2dbc - io.r2dbc + org.postgresql r2dbc-postgresql runtime diff --git a/accountant/accountant-ports/accountant-wallet-proxy/src/main/kotlin/co/nilin/opex/accountant/ports/walletproxy/proxy/WalletProxyImpl.kt b/accountant/accountant-ports/accountant-wallet-proxy/src/main/kotlin/co/nilin/opex/accountant/ports/walletproxy/proxy/WalletProxyImpl.kt index 70500e611..80ff777d0 100644 --- a/accountant/accountant-ports/accountant-wallet-proxy/src/main/kotlin/co/nilin/opex/accountant/ports/walletproxy/proxy/WalletProxyImpl.kt +++ b/accountant/accountant-ports/accountant-wallet-proxy/src/main/kotlin/co/nilin/opex/accountant/ports/walletproxy/proxy/WalletProxyImpl.kt @@ -1,13 +1,18 @@ package co.nilin.opex.accountant.ports.walletproxy.proxy +import co.nilin.opex.accountant.core.inout.TransferRequest import co.nilin.opex.accountant.core.spi.WalletProxy import co.nilin.opex.accountant.ports.walletproxy.data.BooleanResponse import co.nilin.opex.accountant.ports.walletproxy.data.TransferResult import kotlinx.coroutines.reactive.awaitFirst +import kotlinx.coroutines.reactive.awaitFirstOrNull import org.springframework.beans.factory.annotation.Value +import org.springframework.http.MediaType import org.springframework.stereotype.Component import org.springframework.web.reactive.function.client.WebClient +import org.springframework.web.reactive.function.client.body import org.springframework.web.reactive.function.client.bodyToMono +import reactor.core.publisher.Mono import java.math.BigDecimal @Component @@ -28,7 +33,7 @@ class WalletProxyImpl( ) { webClient.post() .uri("$walletBaseUrl/transfer/${amount}_$symbol/from/${senderUuid}_$senderWalletType/to/${receiverUuid}_$receiverWalletType") - .header("Content-Type", "application/json") + .contentType(MediaType.APPLICATION_JSON) .retrieve() .onStatus({ t -> t.isError }, { it.createException() }) .bodyToMono() @@ -36,6 +41,18 @@ class WalletProxyImpl( .awaitFirst() } + override suspend fun batchTransfer(transfers: List) { + webClient.post() + .uri("$walletBaseUrl/transfer/batch") + .contentType(MediaType.APPLICATION_JSON) + .body(Mono.just(transfers)) + .retrieve() + .onStatus({ t -> t.isError }, { it.createException() }) + .bodyToMono() + .log() + .awaitFirstOrNull() + } + override suspend fun canFulfil(symbol: String, walletType: String, uuid: String, amount: BigDecimal): Boolean { return webClient.get() .uri("$walletBaseUrl/$uuid/wallet_type/$walletType/can_withdraw/${amount}_$symbol") diff --git a/api/api-ports/api-binance-rest/pom.xml b/api/api-ports/api-binance-rest/pom.xml index 66ddd73a5..742dcd9ee 100644 --- a/api/api-ports/api-binance-rest/pom.xml +++ b/api/api-ports/api-binance-rest/pom.xml @@ -53,7 +53,7 @@ spring-boot-starter-data-r2dbc - io.r2dbc + org.postgresql r2dbc-postgresql runtime diff --git a/api/api-ports/api-binance-rest/src/main/kotlin/co/nilin/opex/api/ports/binance/controller/AccountController.kt b/api/api-ports/api-binance-rest/src/main/kotlin/co/nilin/opex/api/ports/binance/controller/AccountController.kt index 4edfd83d2..4bb21427a 100644 --- a/api/api-ports/api-binance-rest/src/main/kotlin/co/nilin/opex/api/ports/binance/controller/AccountController.kt +++ b/api/api-ports/api-binance-rest/src/main/kotlin/co/nilin/opex/api/ports/binance/controller/AccountController.kt @@ -90,6 +90,7 @@ class AccountController( @CurrentSecurityContext securityContext: SecurityContext ): NewOrderResponse { val internalSymbol = symbolMapper.toInternalSymbol(symbol) ?: throw OpexException(OpexError.SymbolNotFound) + validateNewOrderParams(type, price, quantity, timeInForce, stopPrice, quoteOrderQty) matchingGatewayProxy.createNewOrder( securityContext.jwtAuthentication().name, @@ -414,6 +415,69 @@ class AccountController( ) } + private fun validateNewOrderParams( + type: OrderType, + price: BigDecimal?, + quantity: BigDecimal?, + timeInForce: TimeInForce?, + stopPrice: BigDecimal?, + quoteOrderQty: BigDecimal?, + ) { + when (type) { + OrderType.LIMIT -> { + checkDecimal(price, "price") + checkDecimal(quantity, "quantity") + checkNull(timeInForce, "timeInForce") + } + + OrderType.MARKET -> { + if (quantity == null) + checkDecimal(quoteOrderQty, "quoteOrderQty") + else + checkDecimal(quantity, "quantity") + } + + OrderType.STOP_LOSS -> { + checkDecimal(quantity, "quantity") + checkDecimal(stopPrice, "stopPrice") + } + + OrderType.STOP_LOSS_LIMIT -> { + checkDecimal(price, "price") + checkDecimal(quantity, "quantity") + checkDecimal(stopPrice, "stopPrice") + checkNull(timeInForce, "timeInForce") + } + + OrderType.TAKE_PROFIT -> { + checkDecimal(quantity, "quantity") + checkDecimal(stopPrice, "stopPrice") + } + + OrderType.TAKE_PROFIT_LIMIT -> { + checkDecimal(price, "price") + checkDecimal(quantity, "quantity") + checkDecimal(stopPrice, "stopPrice") + checkNull(timeInForce, "timeInForce") + } + + OrderType.LIMIT_MAKER -> { + checkDecimal(price, "price") + checkDecimal(quantity, "quantity") + } + } + } + + private fun checkDecimal(decimal: BigDecimal?, paramName: String) { + if (decimal == null || decimal <= BigDecimal.ZERO) + throw OpexException(OpexError.InvalidRequestParam, "Parameter '$paramName' is either missing or invalid") + } + + private fun checkNull(obj: Any?, paramName: String) { + if (obj == null) + throw OpexException(OpexError.InvalidRequestParam, "Parameter '$paramName' is either missing or invalid") + } + private fun Order.asQueryOrderResponse() = QueryOrderResponse( symbol, ouid, diff --git a/api/api-ports/api-persister-postgres/pom.xml b/api/api-ports/api-persister-postgres/pom.xml index ee76e7b1a..83c6cd821 100644 --- a/api/api-ports/api-persister-postgres/pom.xml +++ b/api/api-ports/api-persister-postgres/pom.xml @@ -33,7 +33,7 @@ spring-boot-starter-data-r2dbc - io.r2dbc + org.postgresql r2dbc-postgresql runtime diff --git a/bc-gateway/bc-gateway-ports/bc-gateway-persister-postgres/pom.xml b/bc-gateway/bc-gateway-ports/bc-gateway-persister-postgres/pom.xml index 7de49a35a..d5bda18ac 100644 --- a/bc-gateway/bc-gateway-ports/bc-gateway-persister-postgres/pom.xml +++ b/bc-gateway/bc-gateway-ports/bc-gateway-persister-postgres/pom.xml @@ -25,7 +25,7 @@ spring-boot-starter-data-r2dbc - io.r2dbc + org.postgresql r2dbc-postgresql runtime diff --git a/docker-images/vault/Dockerfile b/docker-images/vault/Dockerfile index c252d77bb..a4e183276 100644 --- a/docker-images/vault/Dockerfile +++ b/docker-images/vault/Dockerfile @@ -1,4 +1,4 @@ -FROM vault:1.10.1 +FROM vault:1.12.2 COPY ["backend-policy.hcl", "panel-policy.hcl", "vault.json", "workflow-vault.sh", "/vault/config/"] EXPOSE 8200 ENTRYPOINT /vault/config/workflow-vault.sh diff --git a/eventlog/eventlog-ports/eventlog-persister-postgres/pom.xml b/eventlog/eventlog-ports/eventlog-persister-postgres/pom.xml index c65e3647d..be2b16d8f 100644 --- a/eventlog/eventlog-ports/eventlog-persister-postgres/pom.xml +++ b/eventlog/eventlog-ports/eventlog-persister-postgres/pom.xml @@ -33,7 +33,7 @@ spring-boot-starter-data-r2dbc - io.r2dbc + org.postgresql r2dbc-postgresql runtime diff --git a/market/market-ports/market-persister-postgres/pom.xml b/market/market-ports/market-persister-postgres/pom.xml index df22b6931..62f06eade 100644 --- a/market/market-ports/market-persister-postgres/pom.xml +++ b/market/market-ports/market-persister-postgres/pom.xml @@ -33,7 +33,7 @@ spring-boot-starter-data-r2dbc - io.r2dbc + org.postgresql r2dbc-postgresql runtime diff --git a/market/market-ports/market-persister-postgres/src/main/kotlin/co/nilin/opex/market/ports/postgres/dao/OrderRepository.kt b/market/market-ports/market-persister-postgres/src/main/kotlin/co/nilin/opex/market/ports/postgres/dao/OrderRepository.kt index 2a5e91d4b..810cc540a 100644 --- a/market/market-ports/market-persister-postgres/src/main/kotlin/co/nilin/opex/market/ports/postgres/dao/OrderRepository.kt +++ b/market/market-ports/market-persister-postgres/src/main/kotlin/co/nilin/opex/market/ports/postgres/dao/OrderRepository.kt @@ -44,6 +44,7 @@ interface OrderRepository : ReactiveCrudRepository { where uuid = :uuid and (:symbol is null or symbol = :symbol) and status in (:statuses) and appearance = (select max(appearance) from order_status where ouid = orders.ouid) and executed_quantity = (select max(executed_quantity) from order_status where ouid = orders.ouid) + order by create_date desc limit :limit """ ) diff --git a/market/market-ports/market-persister-postgres/src/main/kotlin/co/nilin/opex/market/ports/postgres/dao/TradeRepository.kt b/market/market-ports/market-persister-postgres/src/main/kotlin/co/nilin/opex/market/ports/postgres/dao/TradeRepository.kt index e3e107a78..8508f5335 100644 --- a/market/market-ports/market-persister-postgres/src/main/kotlin/co/nilin/opex/market/ports/postgres/dao/TradeRepository.kt +++ b/market/market-ports/market-persister-postgres/src/main/kotlin/co/nilin/opex/market/ports/postgres/dao/TradeRepository.kt @@ -46,7 +46,7 @@ interface TradeRepository : ReactiveCrudRepository { startTime: Date?, @Param("endTime") endTime: Date?, - limit:Int + limit: Int ): Flow @Query("select * from trades where symbol = :symbol order by create_date desc limit :limit") @@ -59,12 +59,14 @@ interface TradeRepository : ReactiveCrudRepository { @Query( """ + with first_trade as (select * from trades where create_date > :date order by create_date limit 1), + last_trade as (select * from trades where create_date > :date order by create_date desc limit 1) select symbol, - (select taker_price from trades where create_date > :date and symbol=t.symbol order by create_date desc limit 1) - (select taker_price from trades where create_date > :date and symbol=t.symbol order by create_date limit 1) as price_change, - ((((select taker_price from trades where create_date > :date and symbol=t.symbol order by create_date desc limit 1) - (select taker_price from trades where create_date > :date and symbol=t.symbol order by create_date limit 1))/(select taker_price from trades where create_date > :date and symbol=t.symbol order by create_date limit 1))*100) as price_change_percent, - (sum(matched_quantity)/sum(taker_price)) as weighted_avg_price, - (select taker_price from trades where create_date > :date and symbol=t.symbol order by create_date limit 1) as last_price, - (select matched_quantity from trades where create_date > :date and symbol=t.symbol order by create_date limit 1) as last_qty, + (select matched_price from last_trade where symbol=t.symbol) - (select matched_price from first_trade where symbol=t.symbol) as price_change, + ((((select matched_price from last_trade where symbol=t.symbol) - (select matched_price from first_trade where symbol=t.symbol))/(select matched_price from first_trade where symbol=t.symbol))*100) as price_change_percent, + (sum(matched_quantity)/sum(matched_price)) as weighted_avg_price, + (select matched_price from last_trade where symbol=t.symbol) as last_price, + (select matched_quantity from last_trade where symbol=t.symbol) as last_qty, ( select price from orders join order_status os on orders.ouid = os.ouid @@ -89,11 +91,11 @@ interface TradeRepository : ReactiveCrudRepository { and executed_quantity = (select max(executed_quantity) from order_status where ouid = orders.ouid) order by create_date desc limit 1 ) as open_price, - max(taker_price) as high_price, - min(taker_price) as low_price, + max(matched_price) as high_price, + min(matched_price) as low_price, sum(matched_quantity) as volume, - (select id from trades where create_date > :date and symbol=t.symbol order by create_date limit 1) as first_id, - (select id from trades where create_date > :date and symbol=t.symbol order by create_date desc limit 1) as last_id, + (select id from first_trade where symbol=t.symbol) as first_id, + (select id from last_trade where symbol=t.symbol) as last_id, count(id) as count from trades as t where create_date > :date @@ -104,12 +106,14 @@ interface TradeRepository : ReactiveCrudRepository { @Query( """ + with first_trade as (select * from trades where create_date > :date and symbol = :symbol order by create_date limit 1), + last_trade as (select * from trades where create_date > :date and symbol = :symbol order by create_date desc limit 1) select symbol, - (select taker_price from trades where create_date > :date and symbol=:symbol order by create_date desc limit 1) - (select taker_price from trades where create_date > :date and symbol=:symbol order by create_date limit 1) as price_change, - ((((select taker_price from trades where create_date > :date and symbol=:symbol order by create_date desc limit 1) - (select taker_price from trades where create_date > :date and symbol=:symbol order by create_date limit 1))/(select taker_price from trades where create_date > :date and symbol=:symbol order by create_date limit 1))*100) as price_change_percent, - (sum(matched_quantity)/sum(taker_price)) as weighted_avg_price, - (select taker_price from trades where create_date > :date and symbol=:symbol order by create_date limit 1) as last_price, - (select matched_quantity from trades where create_date > :date and symbol=:symbol order by create_date limit 1) as last_qty, + (select matched_price from last_trade) - (select matched_price from first_trade) as price_change, + ((((select matched_price from last_trade) - (select matched_price from first_trade))/(select matched_price from first_trade))*100) as price_change_percent, + (sum(matched_quantity)/sum(matched_price)) as weighted_avg_price, + (select matched_price from last_trade) as last_price, + (select matched_quantity from last_trade) as last_qty, ( select price from orders join order_status os on orders.ouid = os.ouid @@ -134,11 +138,11 @@ interface TradeRepository : ReactiveCrudRepository { and executed_quantity = (select max(executed_quantity) from order_status where ouid = orders.ouid) order by create_date desc limit 1 ) as open_price, - max(taker_price) as high_price, - min(taker_price) as low_price, + max(matched_price) as high_price, + min(matched_price) as low_price, sum(matched_quantity) as volume, - (select id from trades where create_date > :date and symbol=:symbol order by create_date limit 1) as first_id, - (select id from trades where create_date > :date and symbol=:symbol order by create_date desc limit 1) as last_id, + (select id from first_trade) as first_id, + (select id from last_trade) as last_id, count(id) as count from trades as t where create_date > :date and symbol = :symbol @@ -241,10 +245,10 @@ interface TradeRepository : ReactiveCrudRepository { select f.start_time as open_time, f.end_time as close_time, - (select taker_price from trades tt where symbol = :symbol and tt.create_date >= f.start_time and tt.create_date < f.end_time order by tt.create_date limit 1) as open, - max(t.taker_price) as high, - min(t.taker_price) as low, - (select taker_price from trades tt where symbol = :symbol and tt.create_date >= f.start_time and tt.create_date < f.end_time order by tt.create_date desc limit 1) as close, + (select matched_price from trades tt where symbol = :symbol and tt.create_date >= f.start_time and tt.create_date < f.end_time order by tt.create_date limit 1) as open, + max(t.matched_price) as high, + min(t.matched_price) as low, + (select matched_price from trades tt where symbol = :symbol and tt.create_date >= f.start_time and tt.create_date < f.end_time order by tt.create_date desc limit 1) as close, sum(t.matched_quantity) as volume, count(id) as trades from trades t @@ -252,7 +256,7 @@ interface TradeRepository : ReactiveCrudRepository { on t.create_date >= f.start_time and t.create_date < f.end_time where symbol = :symbol or symbol is null group by f.start_time, f.end_time - order by f.start_time asc + order by f.start_time limit :limit """ ) @@ -283,21 +287,23 @@ interface TradeRepository : ReactiveCrudRepository { @Query( """ + with first_trade as (select matched_price, symbol from trades where create_date > :since order by create_date limit 1), + last_trade as (select matched_price, symbol from trades where create_date > :since order by create_date desc limit 1) select symbol, - coalesce((select matched_price from trades where create_date > :since and symbol = t.symbol order by create_date desc limit 1), 0.0) as last_price, + coalesce((select matched_price from last_trade where symbol = t.symbol), 0.0) as last_price, coalesce( max( - (select matched_price from trades where create_date > :since and symbol = t.symbol order by create_date desc limit 1) - - (select matched_price from trades where create_date > :since and symbol = t.symbol order by create_date limit 1) + (select matched_price from last_trade where symbol = t.symbol) + - (select matched_price from first_trade where symbol = t.symbol) ), 0.0 ) as price_change, coalesce( ( - (select matched_price from trades where create_date > :since and symbol = t.symbol order by create_date desc limit 1) - - (select matched_price from trades where create_date > :since and symbol = t.symbol order by create_date limit 1) - ) / (select matched_price from trades where create_date > :since and symbol = t.symbol order by create_date limit 1) * 100, + (select matched_price from last_trade where symbol = t.symbol) + -(select matched_price from first_trade where symbol = t.symbol) + ) / (select matched_price from first_trade where symbol = t.symbol) * 100, 0.0 ) as price_change_percent from trades t @@ -310,21 +316,23 @@ interface TradeRepository : ReactiveCrudRepository { @Query( """ + with first_trade as (select matched_price, symbol from trades where create_date > :since order by create_date limit 1), + last_trade as (select matched_price, symbol from trades where create_date > :since order by create_date desc limit 1) select symbol, - coalesce((select matched_price from trades where create_date > :since and symbol = t.symbol order by create_date desc limit 1), 0.0) as last_price, + coalesce((select matched_price from last_trade where symbol = t.symbol), 0.0) as last_price, coalesce( max( - (select matched_price from trades where create_date > :since and symbol = t.symbol order by create_date desc limit 1) - - (select matched_price from trades where create_date > :since and symbol = t.symbol order by create_date limit 1) + (select matched_price from last_trade where symbol = t.symbol) + - (select matched_price from first_trade where symbol = t.symbol) ), 0.0 ) as price_change, coalesce( ( - (select matched_price from trades where create_date > :since and symbol = t.symbol order by create_date desc limit 1) - -(select matched_price from trades where create_date > :since and symbol = t.symbol order by create_date limit 1) - ) / (select matched_price from trades where create_date > :since and symbol = t.symbol order by create_date limit 1) * 100, + (select matched_price from last_trade where symbol = t.symbol) + -(select matched_price from first_trade where symbol = t.symbol) + ) / (select matched_price from first_trade where symbol = t.symbol) * 100, 0.0 ) as price_change_percent from trades t @@ -337,15 +345,17 @@ interface TradeRepository : ReactiveCrudRepository { @Query( """ + with first_trade as (select matched_quantity as mq, symbol from trades where create_date > :since order by create_date limit 1), + last_trade as (select matched_quantity as mq, symbol from trades where create_date > :since order by create_date desc limit 1) select symbol, coalesce(sum(matched_quantity), 0.0) as volume, count(id) as trade_count, coalesce( ( - (select matched_quantity from trades where create_date > :since and symbol = t.symbol order by create_date desc limit 1) - - (select matched_quantity from trades where create_date > :since and symbol = t.symbol order by create_date limit 1) - ) / (select matched_quantity from trades where create_date > :since and symbol = t.symbol order by create_date limit 1) * 100, + (select mq from last_trade where symbol = t.symbol) + - (select mq from first_trade where symbol = t.symbol) + ) / (select mq from first_trade where symbol = t.symbol) * 100, 0.0 ) as change from trades t @@ -359,15 +369,17 @@ interface TradeRepository : ReactiveCrudRepository { @Query( """ + with first_trade as (select matched_quantity as mq, symbol from trades where create_date > :since order by create_date limit 1), + last_trade as (select matched_quantity as mq, symbol from trades where create_date > :since order by create_date desc limit 1) select symbol, coalesce(sum(matched_quantity), 0.0) as volume, count(id) as trade_count, coalesce( ( - (select matched_quantity from trades where create_date > :since and symbol = t.symbol order by create_date desc limit 1) - - (select matched_quantity from trades where create_date > :since and symbol = t.symbol order by create_date limit 1) - ) / (select matched_quantity from trades where create_date > :since and symbol = t.symbol order by create_date limit 1) * 100, + (select mq from last_trade where symbol = t.symbol) + - (select mq from first_trade where symbol = t.symbol) + ) / (select mq from first_trade where symbol = t.symbol) * 100, 0.0 ) as change from trades t diff --git a/pom.xml b/pom.xml index 4db68166a..ec0f9f998 100644 --- a/pom.xml +++ b/pom.xml @@ -14,8 +14,8 @@ 11 11 1.6.0 - 2.6.2 - 2021.0.0 + 2.7.6 + 2021.0.5 true diff --git a/referral/referral-ports/referral-persister-postgres/pom.xml b/referral/referral-ports/referral-persister-postgres/pom.xml index bdb72669d..b16283529 100644 --- a/referral/referral-ports/referral-persister-postgres/pom.xml +++ b/referral/referral-ports/referral-persister-postgres/pom.xml @@ -25,7 +25,7 @@ spring-boot-starter-data-r2dbc - io.r2dbc + org.postgresql r2dbc-postgresql runtime diff --git a/referral/referral-ports/referral-wallet-proxy/pom.xml b/referral/referral-ports/referral-wallet-proxy/pom.xml index 31c367031..f634dd19c 100644 --- a/referral/referral-ports/referral-wallet-proxy/pom.xml +++ b/referral/referral-ports/referral-wallet-proxy/pom.xml @@ -25,7 +25,7 @@ spring-boot-starter-data-r2dbc - io.r2dbc + org.postgresql r2dbc-postgresql runtime diff --git a/wallet/wallet-app/src/main/kotlin/co/nilin/opex/wallet/app/controller/PaymentGatewayController.kt b/wallet/wallet-app/src/main/kotlin/co/nilin/opex/wallet/app/controller/PaymentGatewayController.kt index 667bf8c59..5d4479ae3 100644 --- a/wallet/wallet-app/src/main/kotlin/co/nilin/opex/wallet/app/controller/PaymentGatewayController.kt +++ b/wallet/wallet-app/src/main/kotlin/co/nilin/opex/wallet/app/controller/PaymentGatewayController.kt @@ -7,8 +7,8 @@ import co.nilin.opex.wallet.app.dto.PaymentDepositRequest import co.nilin.opex.wallet.app.dto.PaymentDepositResponse import co.nilin.opex.wallet.core.inout.TransferCommand import co.nilin.opex.wallet.core.model.Amount -import co.nilin.opex.wallet.core.service.TransferService import co.nilin.opex.wallet.core.spi.CurrencyService +import co.nilin.opex.wallet.core.spi.TransferManager import co.nilin.opex.wallet.core.spi.WalletManager import co.nilin.opex.wallet.core.spi.WalletOwnerManager import org.springframework.web.bind.annotation.PostMapping @@ -20,7 +20,7 @@ import java.math.BigDecimal @RestController @RequestMapping("/payment") class PaymentGatewayController( - val transferService: TransferService, + val transferManager: TransferManager, val currencyService: CurrencyService, val walletManager: WalletManager, val walletOwnerManager: WalletOwnerManager @@ -55,7 +55,7 @@ class PaymentGatewayController( receiverWalletType ) - val command = transferService.transfer( + val command = transferManager.transfer( TransferCommand( sourceWallet, receiverWallet, diff --git a/wallet/wallet-app/src/main/kotlin/co/nilin/opex/wallet/app/controller/TransferController.kt b/wallet/wallet-app/src/main/kotlin/co/nilin/opex/wallet/app/controller/TransferController.kt index 1cce26772..9006d0e07 100644 --- a/wallet/wallet-app/src/main/kotlin/co/nilin/opex/wallet/app/controller/TransferController.kt +++ b/wallet/wallet-app/src/main/kotlin/co/nilin/opex/wallet/app/controller/TransferController.kt @@ -1,30 +1,17 @@ package co.nilin.opex.wallet.app.controller -import co.nilin.opex.utility.error.data.OpexError -import co.nilin.opex.utility.error.data.OpexException -import co.nilin.opex.wallet.core.inout.TransferCommand +import co.nilin.opex.wallet.app.dto.TransferRequest +import co.nilin.opex.wallet.app.service.TransferService import co.nilin.opex.wallet.core.inout.TransferResult -import co.nilin.opex.wallet.core.model.Amount -import co.nilin.opex.wallet.core.service.TransferService -import co.nilin.opex.wallet.core.spi.CurrencyService -import co.nilin.opex.wallet.core.spi.WalletManager -import co.nilin.opex.wallet.core.spi.WalletOwnerManager import io.swagger.annotations.ApiResponse import io.swagger.annotations.Example import io.swagger.annotations.ExampleProperty -import org.springframework.web.bind.annotation.PathVariable -import org.springframework.web.bind.annotation.PostMapping -import org.springframework.web.bind.annotation.RequestParam -import org.springframework.web.bind.annotation.RestController +import org.springframework.web.bind.annotation.* import java.math.BigDecimal @RestController -class TransferController( - val transferService: TransferService, - val currencyService: CurrencyService, - val walletManager: WalletManager, - val walletOwnerManager: WalletOwnerManager -) { +class TransferController(private val transferService: TransferService) { + @PostMapping("/transfer/{amount}_{symbol}/from/{senderUuid}_{senderWalletType}/to/{receiverUuid}_{receiverWalletType}") @ApiResponse( message = "OK", @@ -46,35 +33,21 @@ class TransferController( @RequestParam("description") description: String?, @RequestParam("transferRef") transferRef: String? ): TransferResult { - if (senderWalletType == "cashout" || receiverWalletType == "cashout") - throw OpexException(OpexError.InvalidCashOutUsage) - val currency = currencyService.getCurrency(symbol) ?: throw OpexException(OpexError.CurrencyNotFound) - val sourceOwner = walletOwnerManager.findWalletOwner(senderUuid) - ?: throw OpexException(OpexError.WalletOwnerNotFound) - val sourceWallet = walletManager.findWalletByOwnerAndCurrencyAndType(sourceOwner, senderWalletType, currency) - ?: throw OpexException(OpexError.WalletNotFound) - - val receiverOwner = walletOwnerManager.findWalletOwner(receiverUuid) ?: walletOwnerManager.createWalletOwner( + return transferService.transfer( + symbol, + senderWalletType, senderUuid, - "not set", - "" - ) - val receiverWallet = walletManager.findWalletByOwnerAndCurrencyAndType( - receiverOwner, receiverWalletType, currency - ) ?: walletManager.createWallet( - receiverOwner, - Amount(currency, BigDecimal.ZERO), - currency, - receiverWalletType + receiverWalletType, + receiverUuid, + amount, + description, + transferRef ) - return transferService.transfer( - TransferCommand( - sourceWallet, - receiverWallet, - Amount(sourceWallet.currency, amount), - description, transferRef, emptyMap() - ) - ).transferResult + } + + @PostMapping("/transfer/batch") + suspend fun batchTransfer(@RequestBody request: List) { + transferService.batchTransfer(request) } @PostMapping("/deposit/{amount}_{symbol}/{receiverUuid}_{receiverWalletType}") @@ -96,34 +69,6 @@ class TransferController( @RequestParam("description") description: String?, @RequestParam("transferRef") transferRef: String? ): TransferResult { - if (receiverWalletType == "cashout") throw OpexException(OpexError.InvalidCashOutUsage) - val systemUuid = "1" - val currency = currencyService.getCurrency(symbol) ?: throw OpexException(OpexError.CurrencyNotFound) - val sourceOwner = walletOwnerManager.findWalletOwner(systemUuid) - ?: throw OpexException(OpexError.WalletOwnerNotFound) - val sourceWallet = walletManager.findWalletByOwnerAndCurrencyAndType(sourceOwner, "main", currency) - ?: throw OpexException(OpexError.WalletNotFound) - - val receiverOwner = walletOwnerManager.findWalletOwner(receiverUuid) ?: walletOwnerManager.createWalletOwner( - systemUuid, - "not set", - "" - ) - val receiverWallet = walletManager.findWalletByOwnerAndCurrencyAndType( - receiverOwner, receiverWalletType, currency - ) ?: walletManager.createWallet( - receiverOwner, - Amount(currency, BigDecimal.ZERO), - currency, - receiverWalletType - ) - return transferService.transfer( - TransferCommand( - sourceWallet, - receiverWallet, - Amount(sourceWallet.currency, amount), - description, transferRef, emptyMap() - ) - ).transferResult + return transferService.deposit(symbol, receiverUuid, receiverWalletType, amount, description, transferRef) } } diff --git a/wallet/wallet-app/src/main/kotlin/co/nilin/opex/wallet/app/dto/TransferRequest.kt b/wallet/wallet-app/src/main/kotlin/co/nilin/opex/wallet/app/dto/TransferRequest.kt new file mode 100644 index 000000000..6b449707c --- /dev/null +++ b/wallet/wallet-app/src/main/kotlin/co/nilin/opex/wallet/app/dto/TransferRequest.kt @@ -0,0 +1,14 @@ +package co.nilin.opex.wallet.app.dto + +import java.math.BigDecimal + +data class TransferRequest( + val amount: BigDecimal, + val symbol: String, + val senderUuid: String, + val senderWalletType: String, + val receiverUuid: String, + val receiverWalletType: String, + val transferRef: String?, + val description: String? +) \ No newline at end of file diff --git a/wallet/wallet-app/src/main/kotlin/co/nilin/opex/wallet/app/service/TransferService.kt b/wallet/wallet-app/src/main/kotlin/co/nilin/opex/wallet/app/service/TransferService.kt new file mode 100644 index 000000000..4f3ca3ed8 --- /dev/null +++ b/wallet/wallet-app/src/main/kotlin/co/nilin/opex/wallet/app/service/TransferService.kt @@ -0,0 +1,142 @@ +package co.nilin.opex.wallet.app.service + +import co.nilin.opex.utility.error.data.OpexError +import co.nilin.opex.utility.error.data.OpexException +import co.nilin.opex.wallet.app.dto.TransferRequest +import co.nilin.opex.wallet.core.inout.TransferCommand +import co.nilin.opex.wallet.core.inout.TransferResult +import co.nilin.opex.wallet.core.model.Amount +import co.nilin.opex.wallet.core.spi.CurrencyService +import co.nilin.opex.wallet.core.spi.TransferManager +import co.nilin.opex.wallet.core.spi.WalletManager +import co.nilin.opex.wallet.core.spi.WalletOwnerManager +import org.springframework.stereotype.Service +import org.springframework.transaction.annotation.Transactional +import java.math.BigDecimal + +@Service +class TransferService( + private val transferManager: TransferManager, + private val currencyService: CurrencyService, + private val walletManager: WalletManager, + private val walletOwnerManager: WalletOwnerManager +) { + + @Transactional + suspend fun transfer( + symbol: String, + senderWalletType: String, + senderUuid: String, + receiverWalletType: String, + receiverUuid: String, + amount: BigDecimal, + description: String?, + transferRef: String? + ): TransferResult { + if (senderWalletType == "cashout" || receiverWalletType == "cashout") + throw OpexException(OpexError.InvalidCashOutUsage) + val currency = currencyService.getCurrency(symbol) ?: throw OpexException(OpexError.CurrencyNotFound) + val sourceOwner = walletOwnerManager.findWalletOwner(senderUuid) + ?: throw OpexException(OpexError.WalletOwnerNotFound) + val sourceWallet = walletManager.findWalletByOwnerAndCurrencyAndType(sourceOwner, senderWalletType, currency) + ?: throw OpexException(OpexError.WalletNotFound) + + val receiverOwner = walletOwnerManager.findWalletOwner(receiverUuid) ?: walletOwnerManager.createWalletOwner( + senderUuid, + "not set", + "" + ) + val receiverWallet = walletManager.findWalletByOwnerAndCurrencyAndType( + receiverOwner, receiverWalletType, currency + ) ?: walletManager.createWallet( + receiverOwner, + Amount(currency, BigDecimal.ZERO), + currency, + receiverWalletType + ) + return transferManager.transfer( + TransferCommand( + sourceWallet, + receiverWallet, + Amount(sourceWallet.currency, amount), + description, transferRef, emptyMap() + ) + ).transferResult + } + + @Transactional + suspend fun batchTransfer(request: List) { + request.filter { it.receiverWalletType != "cashout" && it.senderWalletType != "cashout" } + .forEach { + val currency = currencyService.getCurrency(it.symbol) + ?: throw OpexException(OpexError.CurrencyNotFound) + val sourceOwner = walletOwnerManager.findWalletOwner(it.senderUuid) + ?: throw OpexException(OpexError.WalletOwnerNotFound) + val sourceWallet = + walletManager.findWalletByOwnerAndCurrencyAndType(sourceOwner, it.senderWalletType, currency) + ?: throw OpexException(OpexError.WalletNotFound) + + val receiverOwner = walletOwnerManager.findWalletOwner(it.receiverUuid) + ?: walletOwnerManager.createWalletOwner(it.senderUuid, "not set", "") + + val receiverWallet = + walletManager.findWalletByOwnerAndCurrencyAndType(receiverOwner, it.receiverWalletType, currency) + ?: walletManager.createWallet( + receiverOwner, + Amount(currency, BigDecimal.ZERO), + currency, + it.receiverWalletType + ) + transferManager.transfer( + TransferCommand( + sourceWallet, + receiverWallet, + Amount(sourceWallet.currency, it.amount), + it.description, + it.transferRef, + emptyMap() + ) + ) + } + } + + suspend fun deposit( + symbol: String, + receiverUuid: String, + receiverWalletType: String, + amount: BigDecimal, + description: String?, + transferRef: String? + ): TransferResult { + if (receiverWalletType == "cashout") throw OpexException(OpexError.InvalidCashOutUsage) + val systemUuid = "1" + val currency = currencyService.getCurrency(symbol) ?: throw OpexException(OpexError.CurrencyNotFound) + val sourceOwner = walletOwnerManager.findWalletOwner(systemUuid) + ?: throw OpexException(OpexError.WalletOwnerNotFound) + val sourceWallet = walletManager.findWalletByOwnerAndCurrencyAndType(sourceOwner, "main", currency) + ?: throw OpexException(OpexError.WalletNotFound) + + val receiverOwner = walletOwnerManager.findWalletOwner(receiverUuid) ?: walletOwnerManager.createWalletOwner( + systemUuid, + "not set", + "" + ) + val receiverWallet = walletManager.findWalletByOwnerAndCurrencyAndType( + receiverOwner, receiverWalletType, currency + ) ?: walletManager.createWallet( + receiverOwner, + Amount(currency, BigDecimal.ZERO), + currency, + receiverWalletType + ) + return transferManager.transfer( + TransferCommand( + sourceWallet, + receiverWallet, + Amount(sourceWallet.currency, amount), + description, transferRef, emptyMap() + ) + ).transferResult + } + +} \ No newline at end of file diff --git a/wallet/wallet-core/src/main/kotlin/co/nilin/opex/wallet/core/inout/WalletType.kt b/wallet/wallet-core/src/main/kotlin/co/nilin/opex/wallet/core/inout/WalletType.kt new file mode 100644 index 000000000..263083892 --- /dev/null +++ b/wallet/wallet-core/src/main/kotlin/co/nilin/opex/wallet/core/inout/WalletType.kt @@ -0,0 +1,5 @@ +package co.nilin.opex.wallet.core.inout + +enum class WalletType { + MAIN, EXCHANGE, CASHOUT +} \ No newline at end of file diff --git a/wallet/wallet-core/src/main/kotlin/co/nilin/opex/wallet/core/service/TransferService.kt b/wallet/wallet-core/src/main/kotlin/co/nilin/opex/wallet/core/service/TransferManagerImpl.kt similarity index 89% rename from wallet/wallet-core/src/main/kotlin/co/nilin/opex/wallet/core/service/TransferService.kt rename to wallet/wallet-core/src/main/kotlin/co/nilin/opex/wallet/core/service/TransferManagerImpl.kt index 7ef4072c2..e400eaab4 100644 --- a/wallet/wallet-core/src/main/kotlin/co/nilin/opex/wallet/core/service/TransferService.kt +++ b/wallet/wallet-core/src/main/kotlin/co/nilin/opex/wallet/core/service/TransferManagerImpl.kt @@ -10,20 +10,22 @@ import co.nilin.opex.wallet.core.inout.TransferResultDetailed import co.nilin.opex.wallet.core.model.Amount import co.nilin.opex.wallet.core.model.Transaction import co.nilin.opex.wallet.core.spi.* +import org.springframework.stereotype.Component import org.springframework.stereotype.Service import org.springframework.transaction.annotation.Transactional import java.time.LocalDateTime import java.util.* -@Service -class TransferService( - val walletManager: WalletManager, - val walletListener: WalletListener, - val walletOwnerManager: WalletOwnerManager, - val transactionManager: TransactionManager -) { +@Component +class TransferManagerImpl( + private val walletManager: WalletManager, + private val walletListener: WalletListener, + private val walletOwnerManager: WalletOwnerManager, + private val transactionManager: TransactionManager +) : TransferManager { + @Transactional - suspend fun transfer(transferCommand: TransferCommand): TransferResultDetailed { + override suspend fun transfer(transferCommand: TransferCommand): TransferResultDetailed { //pre transfer hook (dispatch pre transfer event) val srcWallet = transferCommand.sourceWallet val srcWalletOwner = srcWallet.owner diff --git a/wallet/wallet-core/src/main/kotlin/co/nilin/opex/wallet/core/service/WithdrawService.kt b/wallet/wallet-core/src/main/kotlin/co/nilin/opex/wallet/core/service/WithdrawService.kt index 789de6001..18c4c0e5d 100644 --- a/wallet/wallet-core/src/main/kotlin/co/nilin/opex/wallet/core/service/WithdrawService.kt +++ b/wallet/wallet-core/src/main/kotlin/co/nilin/opex/wallet/core/service/WithdrawService.kt @@ -3,10 +3,7 @@ package co.nilin.opex.wallet.core.service import co.nilin.opex.wallet.core.inout.* import co.nilin.opex.wallet.core.model.Amount import co.nilin.opex.wallet.core.model.Withdraw -import co.nilin.opex.wallet.core.spi.CurrencyService -import co.nilin.opex.wallet.core.spi.WalletManager -import co.nilin.opex.wallet.core.spi.WalletOwnerManager -import co.nilin.opex.wallet.core.spi.WithdrawPersister +import co.nilin.opex.wallet.core.spi.* import org.springframework.beans.factory.annotation.Value import org.springframework.stereotype.Service import org.springframework.transaction.annotation.Transactional @@ -15,12 +12,12 @@ import java.time.LocalDateTime @Service class WithdrawService( - val withdrawPersister: WithdrawPersister, - val walletManager: WalletManager, - val walletOwnerManager: WalletOwnerManager, - val currencyService: CurrencyService, - val transferService: TransferService, - @Value("\${app.system.uuid}") val systemUuid: String + private val withdrawPersister: WithdrawPersister, + private val walletManager: WalletManager, + private val walletOwnerManager: WalletOwnerManager, + private val currencyService: CurrencyService, + private val transferManager: TransferManager, + @Value("\${app.system.uuid}") private val systemUuid: String ) { @Transactional @@ -38,7 +35,7 @@ class WithdrawService( currency, "cashout" ) - val transferResultDetailed = transferService.transfer( + val transferResultDetailed = transferManager.transfer( TransferCommand( sourceWallet, receiverWallet, @@ -92,7 +89,7 @@ class WithdrawService( sourceWallet.currency, "main" ) - val transferResultDetailed = transferService.transfer( + val transferResultDetailed = transferManager.transfer( TransferCommand( sourceWallet, receiverWallet, @@ -145,7 +142,7 @@ class WithdrawService( sourceWallet.currency, "main" ) - val transferResultDetailed = transferService.transfer( + val transferResultDetailed = transferManager.transfer( TransferCommand( sourceWallet, receiverWallet, diff --git a/wallet/wallet-core/src/main/kotlin/co/nilin/opex/wallet/core/spi/TransferManager.kt b/wallet/wallet-core/src/main/kotlin/co/nilin/opex/wallet/core/spi/TransferManager.kt new file mode 100644 index 000000000..cfd61b9b1 --- /dev/null +++ b/wallet/wallet-core/src/main/kotlin/co/nilin/opex/wallet/core/spi/TransferManager.kt @@ -0,0 +1,10 @@ +package co.nilin.opex.wallet.core.spi + +import co.nilin.opex.wallet.core.inout.TransferCommand +import co.nilin.opex.wallet.core.inout.TransferResultDetailed + +interface TransferManager { + + suspend fun transfer(transferCommand: TransferCommand): TransferResultDetailed + +} \ No newline at end of file diff --git a/wallet/wallet-core/src/test/kotlin/co/nilin/opex/wallet/core/service/TransferServiceTest.kt b/wallet/wallet-core/src/test/kotlin/co/nilin/opex/wallet/core/service/TransferManagerImplTest.kt similarity index 93% rename from wallet/wallet-core/src/test/kotlin/co/nilin/opex/wallet/core/service/TransferServiceTest.kt rename to wallet/wallet-core/src/test/kotlin/co/nilin/opex/wallet/core/service/TransferManagerImplTest.kt index a66d4faa6..d52adca35 100644 --- a/wallet/wallet-core/src/test/kotlin/co/nilin/opex/wallet/core/service/TransferServiceTest.kt +++ b/wallet/wallet-core/src/test/kotlin/co/nilin/opex/wallet/core/service/TransferManagerImplTest.kt @@ -14,13 +14,13 @@ import org.assertj.core.api.Assertions.assertThat import org.assertj.core.api.Assertions.assertThatThrownBy import org.junit.jupiter.api.Test -private class TransferServiceTest { +private class TransferManagerImplTest { private val walletOwnerManager: WalletOwnerManager = mockk() private val walletManager: WalletManager = mockk() private val walletListener: WalletListener = mockk() private val transactionManager: TransactionManager = mockk() - private val transferService: TransferService = - TransferService(walletManager, walletListener, walletOwnerManager, transactionManager) + private val transferManager: TransferManagerImpl = + TransferManagerImpl(walletManager, walletListener, walletOwnerManager, transactionManager) private fun stubWalletListener() { coEvery { @@ -48,7 +48,7 @@ private class TransferServiceTest { coEvery { walletListener.onDeposit(any(), any(), any(), any(), any(), any()) } returns Unit coEvery { transactionManager.save(any()) } returns "1" - val result = transferService.transfer(VALID.TRANSFER_COMMAND).transferResult + val result = transferManager.transfer(VALID.TRANSFER_COMMAND).transferResult assertThat(result).isNotNull assertThat(result.sourceUuid).isEqualTo(VALID.SOURCE_WALLET_OWNER.uuid) @@ -81,7 +81,7 @@ private class TransferServiceTest { assertThatThrownBy { runBlocking { - transferService.transfer(VALID.TRANSFER_COMMAND) + transferManager.transfer(VALID.TRANSFER_COMMAND) } }.isNotInstanceOf(MockKException::class.java) } @@ -100,7 +100,7 @@ private class TransferServiceTest { assertThatThrownBy { runBlocking { - transferService.transfer(VALID.TRANSFER_COMMAND) + transferManager.transfer(VALID.TRANSFER_COMMAND) } }.isNotInstanceOf(MockKException::class.java) } @@ -119,7 +119,7 @@ private class TransferServiceTest { assertThatThrownBy { runBlocking { - transferService.transfer(VALID.TRANSFER_COMMAND) + transferManager.transfer(VALID.TRANSFER_COMMAND) } }.isNotInstanceOf(MockKException::class.java) } @@ -138,7 +138,7 @@ private class TransferServiceTest { assertThatThrownBy { runBlocking { - transferService.transfer(VALID.TRANSFER_COMMAND) + transferManager.transfer(VALID.TRANSFER_COMMAND) } }.isNotInstanceOf(MockKException::class.java) } @@ -167,7 +167,7 @@ private class TransferServiceTest { assertThatThrownBy { runBlocking { - transferService.transfer(VALID.TRANSFER_COMMAND) + transferManager.transfer(VALID.TRANSFER_COMMAND) } }.isNotInstanceOf(MockKException::class.java) } diff --git a/wallet/wallet-ports/wallet-persister-postgres/pom.xml b/wallet/wallet-ports/wallet-persister-postgres/pom.xml index bdbf040fe..aec02a1aa 100644 --- a/wallet/wallet-ports/wallet-persister-postgres/pom.xml +++ b/wallet/wallet-ports/wallet-persister-postgres/pom.xml @@ -25,7 +25,7 @@ spring-boot-starter-data-r2dbc - io.r2dbc + org.postgresql r2dbc-postgresql runtime diff --git a/wallet/wallet-ports/wallet-persister-postgres/src/main/kotlin/co/nilin/opex/wallet/ports/postgres/impl/WalletOwnerManagerImpl.kt b/wallet/wallet-ports/wallet-persister-postgres/src/main/kotlin/co/nilin/opex/wallet/ports/postgres/impl/WalletOwnerManagerImpl.kt index 904b16593..bdffbf1c3 100644 --- a/wallet/wallet-ports/wallet-persister-postgres/src/main/kotlin/co/nilin/opex/wallet/ports/postgres/impl/WalletOwnerManagerImpl.kt +++ b/wallet/wallet-ports/wallet-persister-postgres/src/main/kotlin/co/nilin/opex/wallet/ports/postgres/impl/WalletOwnerManagerImpl.kt @@ -27,120 +27,99 @@ class WalletOwnerManagerImpl( val walletOwnerRepository: WalletOwnerRepository ) : WalletOwnerManager { - val logger = LoggerFactory.getLogger(WalletOwnerManager::class.java) + private val logger = LoggerFactory.getLogger(WalletOwnerManager::class.java) override suspend fun isDepositAllowed(owner: WalletOwner, amount: Amount): Boolean { require(amount.amount >= BigDecimal.ZERO) - var evaluate: Boolean = limitsRepository.findByOwnerAndAction( - owner.id!!, - "deposit" - ).map { limit -> - evaluateLimit(amount.amount, limit, owner, true) - }.onEmpty { - emit(true) - } - .reduce { a, b -> - a && b - } + var evaluate = limitsRepository.findByOwnerAndAction(owner.id!!, "deposit") + .map { limit -> evaluateLimit(amount.amount, limit, owner, true) } + .onEmpty { emit(true) } + .reduce { a, b -> a && b } + if (evaluate) { - evaluate = limitsRepository.findByLevelAndAction( - owner.level, - "deposit" - ) - .map { limit -> - evaluateLimit(amount.amount, limit, owner, true) - }.onEmpty { - emit(true) - }.reduce { a, b -> - a && b - } + evaluate = limitsRepository.findByLevelAndAction(owner.level, "deposit") + .map { limit -> evaluateLimit(amount.amount, limit, owner, true) } + .onEmpty { emit(true) } + .reduce { a, b -> a && b } } + logger.info("isDepositAllowed: {} {}{} {}", owner.uuid, amount.amount, amount.currency.name, evaluate) return evaluate } + override suspend fun isWithdrawAllowed(owner: WalletOwner, amount: Amount): Boolean { + require(amount.amount >= BigDecimal.ZERO) + var evaluate = limitsRepository.findByOwnerAndAction(owner.id!!, "withdraw") + .map { limit -> evaluateLimit(amount.amount, limit, owner, false) } + .onEmpty { emit(true) } + .reduce { a, b -> a && b } + + if (evaluate) { + evaluate = limitsRepository.findByLevelAndAction(owner.level, "withdraw") + .map { limit -> evaluateLimit(amount.amount, limit, owner, false) } + .onEmpty { emit(true) } + .reduce { a, b -> a && b } + } + + logger.info("isWithdrawAllowed: {} {}{} {}", owner.uuid, amount.amount, amount.currency.name, evaluate) + return evaluate + } + private suspend fun evaluateLimit( amount: BigDecimal, limit: WalletLimitsModel?, owner: WalletOwner, deposit: Boolean ): Boolean { + if (limit == null) + return true + var evaluate = true - if (limit != null) { - val mainCurrency = walletConfigRepository.findAll() - .map { t: WalletConfigModel -> t.mainCurrency } - .awaitFirstOrDefault("BTC") - if (limit.dailyCount != null || limit.dailyTotal != null) { + val mainCurrency = walletConfigRepository.findAll() + .map { it.mainCurrency } + .awaitFirstOrDefault("BTC") + + if (limit.dailyCount != null || limit.dailyTotal != null) { + val ts = if (deposit) { + transactionRepository.calculateDepositStatisticsBasedOnCurrency( + owner.id!!, limit.walletType, LocalDateTime.now().minusDays(1) + .withHour(0).withMinute(0).withSecond(0), LocalDateTime.now(), mainCurrency + ) + } else { + transactionRepository.calculateWithdrawStatisticsBasedOnCurrency( + owner.id!!, limit.walletType, LocalDateTime.now().minusDays(1) + .withHour(0).withMinute(0).withSecond(0), LocalDateTime.now(), mainCurrency + ) + }.awaitFirstOrNull() + evaluate = if (ts != null) { + !((limit.dailyCount != null && ts.cnt!! >= limit.dailyCount) + || (limit.dailyTotal != null && ts.total!! >= limit.dailyTotal)) + } else { + limit.dailyTotal?.let { it >= amount } ?: true + } + } + + if (evaluate) { + if (limit.monthlyCount != null || limit.monthlyTotal != null) { val ts = if (deposit) { transactionRepository.calculateDepositStatisticsBasedOnCurrency( - owner.id!!, limit.walletType, LocalDateTime.now().minusDays(1) + owner.id!!, limit.walletType, LocalDateTime.now().minusMonths(1).withDayOfMonth(1) .withHour(0).withMinute(0).withSecond(0), LocalDateTime.now(), mainCurrency ) } else { transactionRepository.calculateWithdrawStatisticsBasedOnCurrency( - owner.id!!, limit.walletType, LocalDateTime.now().minusDays(1) + owner.id!!, limit.walletType, LocalDateTime.now().minusMonths(1).withDayOfMonth(1) .withHour(0).withMinute(0).withSecond(0), LocalDateTime.now(), mainCurrency ) }.awaitFirstOrNull() evaluate = if (ts != null) { - !((limit.dailyCount != null && ts.cnt!! >= limit.dailyCount) - || (limit.dailyTotal != null && ts.total!! >= limit.dailyTotal)) + !((limit.monthlyCount != null && ts.cnt!! >= limit.monthlyCount) + || (limit.monthlyTotal != null && ts.total!! >= limit.monthlyTotal)) } else { - limit.dailyTotal?.let { it >= amount } ?: true + limit.monthlyTotal?.let { it >= amount } ?: true } } - if (evaluate) { - if (limit.monthlyCount != null || limit.monthlyTotal != null) { - val ts = if (deposit) { - transactionRepository.calculateDepositStatisticsBasedOnCurrency( - owner.id!!, limit.walletType, LocalDateTime.now().minusMonths(1).withDayOfMonth(1) - .withHour(0).withMinute(0).withSecond(0), LocalDateTime.now(), mainCurrency - ) - } else { - transactionRepository.calculateWithdrawStatisticsBasedOnCurrency( - owner.id!!, limit.walletType, LocalDateTime.now().minusMonths(1).withDayOfMonth(1) - .withHour(0).withMinute(0).withSecond(0), LocalDateTime.now(), mainCurrency - ) - }.awaitFirstOrNull() - evaluate = if (ts != null) { - !((limit.monthlyCount != null && ts.cnt!! >= limit.monthlyCount) - || (limit.monthlyTotal != null && ts.total!! >= limit.monthlyTotal)) - } else { - limit.monthlyTotal?.let { it >= amount } ?: true - } - } - } - } - return evaluate - } - - override suspend fun isWithdrawAllowed(owner: WalletOwner, amount: Amount): Boolean { - require(amount.amount >= BigDecimal.ZERO) - var evaluate: Boolean = limitsRepository.findByOwnerAndAction( - owner.id!!, - "withdraw" - ) - .map { limit -> - evaluateLimit(amount.amount, limit, owner, false) - }.onEmpty { - emit(true) - }.reduce { a, b -> - a && b - } - if (evaluate) { - evaluate = limitsRepository.findByLevelAndAction( - owner.level, - "withdraw" - ) - .map { limit -> - evaluateLimit(amount.amount, limit, owner, false) - }.onEmpty { - emit(true) - }.reduce { a, b -> - a && b - } } - logger.info("isWithdrawAllowed: {} {}{} {}", owner.uuid, amount.amount, amount.currency.name, evaluate) return evaluate } diff --git a/websocket/websocket-ports/websocket-persister-postgres/pom.xml b/websocket/websocket-ports/websocket-persister-postgres/pom.xml index 7bb7f7a4f..5dd103d7b 100644 --- a/websocket/websocket-ports/websocket-persister-postgres/pom.xml +++ b/websocket/websocket-ports/websocket-persister-postgres/pom.xml @@ -36,7 +36,7 @@ spring-boot-starter-data-r2dbc - io.r2dbc + org.postgresql r2dbc-postgresql runtime