From 6adf3cf80418923385d746ea168af8e9724c6244 Mon Sep 17 00:00:00 2001 From: Peyman Date: Mon, 6 Mar 2023 16:28:04 +0330 Subject: [PATCH 1/6] Squashed commit of the following: commit 65712e3fd6b217b0b395c29e230b7b47df7c18ea Author: Peyman Date: Mon Mar 6 16:23:24 2023 +0330 Finished switching fa manager from proxy to kafka commit 15ff77b4203b6955074fbc1f8e64879de3e52ca0 Author: Peyman Date: Wed Mar 1 19:23:40 2023 +0330 Switch fa manager from proxy to kafka --- .../opex/accountant/app/config/AppConfig.kt | 30 +++-- .../listener/AccountantTempEventListener.kt | 14 ++- .../app/listener/AccountantTradeListener.kt | 9 +- .../accountant/app/listener/OrderListener.kt | 10 +- .../app/scheduler/FinancialActionsJob.kt | 20 +-- .../core/api/FinancialActionJobManager.kt | 5 - .../core/api/FinancialActionProcessor.kt | 12 ++ .../core/inout/FinancialActionEvent.kt | 17 +++ .../accountant/core/model/FinancialAction.kt | 12 +- .../service/FinancialActionJobManagerImpl.kt | 6 +- .../service/FinancialActionProcessorImpl.kt | 59 +++++++++ .../core/service/TradeManagerImpl.kt | 1 - .../core/spi/FinancialActionLoader.kt | 4 +- .../core/spi/FinancialActionPersister.kt | 4 + .../core/spi/FinancialActionPublisher.kt | 7 ++ .../core/service/FAManagerImplTest.kt | 115 ++++++++++++++++++ .../core/service/OrderManagerImplTest.kt | 1 - .../opex/accountant/core/service/Valid.kt | 2 +- .../postgres/dao/FinancialActionRepository.kt | 10 +- .../impl/FinancialActionLoaderImpl.kt | 13 +- .../impl/FinancialActionPersisterImpl.kt | 34 +++++- .../postgres/model/FinancialActionModel.kt | 5 +- .../src/main/resources/schema.sql | 5 +- .../ports/postgres/FAPersisterImplTest.kt | 5 +- .../opex/accountant/ports/postgres/Valid.kt | 8 +- .../submitter/config/KafkaTopicConfig.kt | 8 ++ .../submitter/config/SubmitterKafkaConfig.kt | 14 ++- .../service/FinancialActionSubmitter.kt | 49 ++++++++ .../nilin/opex/wallet/app/config/AppConfig.kt | 6 + .../FinancialActionEventListenerImpl.kt | 35 ++++++ .../listener/config/KafkaProducerConfig.kt | 11 ++ .../listener/config/WalletKafkaConfig.kt | 24 +++- .../consumer/FinancialActionKafkaListener.kt | 24 ++++ .../listener/model/FinancialActionEvent.kt | 46 +++++++ .../spi/FinancialActionEventListener.kt | 11 ++ 35 files changed, 568 insertions(+), 68 deletions(-) delete mode 100644 accountant/accountant-core/src/main/kotlin/co/nilin/opex/accountant/core/api/FinancialActionJobManager.kt create mode 100644 accountant/accountant-core/src/main/kotlin/co/nilin/opex/accountant/core/api/FinancialActionProcessor.kt create mode 100644 accountant/accountant-core/src/main/kotlin/co/nilin/opex/accountant/core/inout/FinancialActionEvent.kt create mode 100644 accountant/accountant-core/src/main/kotlin/co/nilin/opex/accountant/core/service/FinancialActionProcessorImpl.kt create mode 100644 accountant/accountant-core/src/main/kotlin/co/nilin/opex/accountant/core/spi/FinancialActionPublisher.kt create mode 100644 accountant/accountant-core/src/test/kotlin/co/nilin/opex/accountant/core/service/FAManagerImplTest.kt create mode 100644 accountant/accountant-ports/accountant-submitter-kafka/src/main/kotlin/co/nilin/opex/accountant/ports/kafka/submitter/service/FinancialActionSubmitter.kt create mode 100644 wallet/wallet-app/src/main/kotlin/co/nilin/opex/wallet/app/listener/FinancialActionEventListenerImpl.kt create mode 100644 wallet/wallet-ports/wallet-eventlistener-kafka/src/main/kotlin/co/nilin/opex/wallet/ports/kafka/listener/consumer/FinancialActionKafkaListener.kt create mode 100644 wallet/wallet-ports/wallet-eventlistener-kafka/src/main/kotlin/co/nilin/opex/wallet/ports/kafka/listener/model/FinancialActionEvent.kt create mode 100644 wallet/wallet-ports/wallet-eventlistener-kafka/src/main/kotlin/co/nilin/opex/wallet/ports/kafka/listener/spi/FinancialActionEventListener.kt diff --git a/accountant/accountant-app/src/main/kotlin/co/nilin/opex/accountant/app/config/AppConfig.kt b/accountant/accountant-app/src/main/kotlin/co/nilin/opex/accountant/app/config/AppConfig.kt index 85d3a389a..de2a219d5 100644 --- a/accountant/accountant-app/src/main/kotlin/co/nilin/opex/accountant/app/config/AppConfig.kt +++ b/accountant/accountant-app/src/main/kotlin/co/nilin/opex/accountant/app/config/AppConfig.kt @@ -5,10 +5,10 @@ import co.nilin.opex.accountant.app.listener.AccountantTempEventListener import co.nilin.opex.accountant.app.listener.AccountantTradeListener import co.nilin.opex.accountant.app.listener.OrderListener import co.nilin.opex.accountant.core.api.FeeCalculator -import co.nilin.opex.accountant.core.api.FinancialActionJobManager +import co.nilin.opex.accountant.core.api.FinancialActionProcessor import co.nilin.opex.accountant.core.api.OrderManager import co.nilin.opex.accountant.core.api.TradeManager -import co.nilin.opex.accountant.core.service.FinancialActionJobManagerImpl +import co.nilin.opex.accountant.core.service.FinancialActionProcessorImpl import co.nilin.opex.accountant.core.service.OrderManagerImpl import co.nilin.opex.accountant.core.service.TradeManagerImpl import co.nilin.opex.accountant.core.spi.* @@ -29,12 +29,12 @@ class AppConfig { fun getFinancialActionJobManager( financialActionLoader: FinancialActionLoader, financialActionPersister: FinancialActionPersister, - walletProxy: WalletProxy - ): FinancialActionJobManager { - return FinancialActionJobManagerImpl( + financialActionPublisher: FinancialActionPublisher + ): FinancialActionProcessor { + return FinancialActionProcessorImpl( financialActionLoader, financialActionPersister, - walletProxy + financialActionPublisher ) } @@ -47,7 +47,7 @@ class AppConfig { orderPersister: OrderPersister, tempEventPersister: TempEventPersister, tempEventRepublisher: TempEventRepublisher, - richOrderPublisher: RichOrderPublisher, + richOrderPublisher: RichOrderPublisher ): OrderManager { return OrderManagerImpl( pairConfigLoader, @@ -82,13 +82,16 @@ class AppConfig { } @Bean - fun orderListener(orderManager: OrderManager): OrderListener { - return OrderListener(orderManager) + fun orderListener(orderManager: OrderManager, financialActionProcessor: FinancialActionProcessor): OrderListener { + return OrderListener(orderManager, financialActionProcessor) } @Bean - fun accountantTradeListener(tradeManager: TradeManager): AccountantTradeListener { - return AccountantTradeListener(tradeManager) + fun accountantTradeListener( + tradeManager: TradeManager, + financialActionProcessor: FinancialActionProcessor + ): AccountantTradeListener { + return AccountantTradeListener(tradeManager, financialActionProcessor) } @Bean @@ -99,9 +102,10 @@ class AppConfig { @Bean fun accountantTempEventListener( orderManager: OrderManager, - tradeManager: TradeManager + tradeManager: TradeManager, + financialActionProcessor: FinancialActionProcessor ): AccountantTempEventListener { - return AccountantTempEventListener(orderManager, tradeManager) + return AccountantTempEventListener(orderManager, tradeManager, financialActionProcessor) } @Autowired diff --git a/accountant/accountant-app/src/main/kotlin/co/nilin/opex/accountant/app/listener/AccountantTempEventListener.kt b/accountant/accountant-app/src/main/kotlin/co/nilin/opex/accountant/app/listener/AccountantTempEventListener.kt index 21feb6b90..7bea85fab 100644 --- a/accountant/accountant-app/src/main/kotlin/co/nilin/opex/accountant/app/listener/AccountantTempEventListener.kt +++ b/accountant/accountant-app/src/main/kotlin/co/nilin/opex/accountant/app/listener/AccountantTempEventListener.kt @@ -1,24 +1,29 @@ package co.nilin.opex.accountant.app.listener +import co.nilin.opex.accountant.core.api.FinancialActionProcessor import co.nilin.opex.accountant.core.api.OrderManager import co.nilin.opex.accountant.core.api.TradeManager import co.nilin.opex.accountant.ports.kafka.listener.spi.TempEventListener import co.nilin.opex.matching.engine.core.eventh.events.* import kotlinx.coroutines.runBlocking +import org.slf4j.LoggerFactory class AccountantTempEventListener( private val orderManager: OrderManager, - private val tradeManager: TradeManager + private val tradeManager: TradeManager, + private val financialActionProcessor: FinancialActionProcessor ) : TempEventListener { + private val logger = LoggerFactory.getLogger(AccountantTempEventListener::class.java) + override fun id(): String { return "TempEventListener" } override fun onEvent(event: CoreEvent, partition: Int, offset: Long, timestamp: Long) { - println("TempEvent $event") + logger.info("TempEvent received $event") runBlocking { - when (event) { + val fa = when (event) { is CreateOrderEvent -> orderManager.handleNewOrder(event) is RejectOrderEvent -> orderManager.handleRejectOrder(event) is UpdatedOrderEvent -> orderManager.handleUpdateOrder(event) @@ -28,7 +33,8 @@ class AccountantTempEventListener( throw IllegalArgumentException("Event is not accepted ${event::class.java}") } } + financialActionProcessor.process(fa) } - println("onEvent") + logger.info("TempEvent processed") } } \ No newline at end of file diff --git a/accountant/accountant-app/src/main/kotlin/co/nilin/opex/accountant/app/listener/AccountantTradeListener.kt b/accountant/accountant-app/src/main/kotlin/co/nilin/opex/accountant/app/listener/AccountantTradeListener.kt index 024f9034a..bf93778f8 100644 --- a/accountant/accountant-app/src/main/kotlin/co/nilin/opex/accountant/app/listener/AccountantTradeListener.kt +++ b/accountant/accountant-app/src/main/kotlin/co/nilin/opex/accountant/app/listener/AccountantTradeListener.kt @@ -1,12 +1,16 @@ package co.nilin.opex.accountant.app.listener +import co.nilin.opex.accountant.core.api.FinancialActionProcessor import co.nilin.opex.accountant.core.api.TradeManager import co.nilin.opex.accountant.ports.kafka.listener.spi.Listener import co.nilin.opex.accountant.ports.kafka.listener.spi.TradeListener import co.nilin.opex.matching.engine.core.eventh.events.TradeEvent import kotlinx.coroutines.runBlocking -class AccountantTradeListener(private val tradeManager: TradeManager) : TradeListener { +class AccountantTradeListener( + private val tradeManager: TradeManager, + private val financialActionProcessor: FinancialActionProcessor +) : TradeListener { override fun id(): String { return "TradeListener" @@ -14,7 +18,8 @@ class AccountantTradeListener(private val tradeManager: TradeManager) : TradeLis override fun onEvent(event: TradeEvent, partition: Int, offset: Long, timestamp: Long) { runBlocking { - tradeManager.handleTrade(event) + val fa = tradeManager.handleTrade(event) + financialActionProcessor.process(fa) } } } \ No newline at end of file diff --git a/accountant/accountant-app/src/main/kotlin/co/nilin/opex/accountant/app/listener/OrderListener.kt b/accountant/accountant-app/src/main/kotlin/co/nilin/opex/accountant/app/listener/OrderListener.kt index 52f95ef56..145bc6fbe 100644 --- a/accountant/accountant-app/src/main/kotlin/co/nilin/opex/accountant/app/listener/OrderListener.kt +++ b/accountant/accountant-app/src/main/kotlin/co/nilin/opex/accountant/app/listener/OrderListener.kt @@ -1,5 +1,6 @@ package co.nilin.opex.accountant.app.listener +import co.nilin.opex.accountant.core.api.FinancialActionProcessor import co.nilin.opex.accountant.core.api.OrderManager import co.nilin.opex.accountant.ports.kafka.listener.inout.OrderSubmitRequest import co.nilin.opex.accountant.ports.kafka.listener.spi.OrderSubmitRequestListener @@ -7,7 +8,10 @@ import co.nilin.opex.matching.engine.core.eventh.events.SubmitOrderEvent import kotlinx.coroutines.runBlocking import org.slf4j.LoggerFactory -class OrderListener(private val orderManager: OrderManager) : OrderSubmitRequestListener { +class OrderListener( + private val orderManager: OrderManager, + private val financialManagerProcessor: FinancialActionProcessor, +) : OrderSubmitRequestListener { private val logger = LoggerFactory.getLogger(OrderListener::class.java) @@ -19,7 +23,7 @@ class OrderListener(private val orderManager: OrderManager) : OrderSubmitRequest runBlocking { logger.info("Order submit event received ${event.ouid}") - orderManager.handleRequestOrder( + val fa = orderManager.handleRequestOrder( SubmitOrderEvent( event.ouid, event.uuid, @@ -34,6 +38,8 @@ class OrderListener(private val orderManager: OrderManager) : OrderSubmitRequest event.userLevel ) ) + + financialManagerProcessor.process(fa) } } } \ No newline at end of file diff --git a/accountant/accountant-app/src/main/kotlin/co/nilin/opex/accountant/app/scheduler/FinancialActionsJob.kt b/accountant/accountant-app/src/main/kotlin/co/nilin/opex/accountant/app/scheduler/FinancialActionsJob.kt index b8230e70d..b495f2d89 100644 --- a/accountant/accountant-app/src/main/kotlin/co/nilin/opex/accountant/app/scheduler/FinancialActionsJob.kt +++ b/accountant/accountant-app/src/main/kotlin/co/nilin/opex/accountant/app/scheduler/FinancialActionsJob.kt @@ -1,8 +1,7 @@ package co.nilin.opex.accountant.app.scheduler -import co.nilin.opex.accountant.core.api.FinancialActionJobManager -import kotlinx.coroutines.Dispatchers -import kotlinx.coroutines.runBlocking +import co.nilin.opex.accountant.core.api.FinancialActionProcessor +import kotlinx.coroutines.* import org.slf4j.LoggerFactory import org.springframework.context.annotation.Profile import org.springframework.scheduling.annotation.Scheduled @@ -10,20 +9,27 @@ import org.springframework.stereotype.Service @Service @Profile("scheduled") -class FinancialActionsJob(val financialActionJobManager: FinancialActionJobManager) { +class FinancialActionsJob(private val financialActionProcessor: FinancialActionProcessor) { private val log = LoggerFactory.getLogger(FinancialActionsJob::class.java) + private val scope = CoroutineScope(Dispatchers.IO) @Scheduled(fixedDelay = 10000) fun processFinancialActions() { - runBlocking(Dispatchers.IO) { + scope.ensureActive() + if (!scope.isCompleted()) + return + + scope.launch { try { //read unprocessed fa records and call transfer - financialActionJobManager.processFinancialActions(0, 100) + financialActionProcessor.batchProcess(0, 100) } catch (e: Exception) { - log.error("Job error!", e) + log.error("Financial action manager unable to batch process", e) } } } + private fun CoroutineScope.isCompleted() = coroutineContext.job.children.all { it.isCompleted } + } \ No newline at end of file diff --git a/accountant/accountant-core/src/main/kotlin/co/nilin/opex/accountant/core/api/FinancialActionJobManager.kt b/accountant/accountant-core/src/main/kotlin/co/nilin/opex/accountant/core/api/FinancialActionJobManager.kt deleted file mode 100644 index bb9c26062..000000000 --- a/accountant/accountant-core/src/main/kotlin/co/nilin/opex/accountant/core/api/FinancialActionJobManager.kt +++ /dev/null @@ -1,5 +0,0 @@ -package co.nilin.opex.accountant.core.api - -interface FinancialActionJobManager { - suspend fun processFinancialActions(offset: Long, size: Long) -} diff --git a/accountant/accountant-core/src/main/kotlin/co/nilin/opex/accountant/core/api/FinancialActionProcessor.kt b/accountant/accountant-core/src/main/kotlin/co/nilin/opex/accountant/core/api/FinancialActionProcessor.kt new file mode 100644 index 000000000..c9d22d532 --- /dev/null +++ b/accountant/accountant-core/src/main/kotlin/co/nilin/opex/accountant/core/api/FinancialActionProcessor.kt @@ -0,0 +1,12 @@ +package co.nilin.opex.accountant.core.api + +import co.nilin.opex.accountant.core.model.FinancialAction + +interface FinancialActionProcessor { + + suspend fun process(financialAction: FinancialAction) + + suspend fun process(financialActions: List) + + suspend fun batchProcess(offset: Long, size: Long) +} diff --git a/accountant/accountant-core/src/main/kotlin/co/nilin/opex/accountant/core/inout/FinancialActionEvent.kt b/accountant/accountant-core/src/main/kotlin/co/nilin/opex/accountant/core/inout/FinancialActionEvent.kt new file mode 100644 index 000000000..83a33d1ba --- /dev/null +++ b/accountant/accountant-core/src/main/kotlin/co/nilin/opex/accountant/core/inout/FinancialActionEvent.kt @@ -0,0 +1,17 @@ +package co.nilin.opex.accountant.core.inout + +import java.math.BigDecimal +import java.time.LocalDateTime + +data class FinancialActionEvent( + val uuid: String, + val symbol: String, + val amount: BigDecimal, + val sender: String, + val senderWalletType: String, + val receiver: String, + val receiverWalletType: String, + val createDate: LocalDateTime, + val transferRef: String?, + val description: String +) \ No newline at end of file diff --git a/accountant/accountant-core/src/main/kotlin/co/nilin/opex/accountant/core/model/FinancialAction.kt b/accountant/accountant-core/src/main/kotlin/co/nilin/opex/accountant/core/model/FinancialAction.kt index 96ff9a2c6..7b4589e20 100644 --- a/accountant/accountant-core/src/main/kotlin/co/nilin/opex/accountant/core/model/FinancialAction.kt +++ b/accountant/accountant-core/src/main/kotlin/co/nilin/opex/accountant/core/model/FinancialAction.kt @@ -2,6 +2,7 @@ package co.nilin.opex.accountant.core.model import java.math.BigDecimal import java.time.LocalDateTime +import java.util.UUID class FinancialAction( val parent: FinancialAction?, @@ -14,10 +15,19 @@ class FinancialAction( val receiver: String, val receiverWalletType: String, val createDate: LocalDateTime, - val retryCount: Int = 0, + val status: FinancialActionStatus = FinancialActionStatus.CREATED, + val uuid: String = UUID.randomUUID().toString(), val id: Long? = null ) { + override fun equals(other: Any?): Boolean { + if (other == null || other !is FinancialAction) return false + return if (id != null && other.id != null) + id == other.id + else + uuid == other.uuid + } + override fun toString(): String { return "FinancialAction(id=$id, parent=$parent, eventType='$eventType', pointer='$pointer', symbol='$symbol', amount=$amount, sender='$sender', senderWalletType='$senderWalletType', receiver='$receiver', receiverWalletType='$receiverWalletType', createDate=$createDate)" } diff --git a/accountant/accountant-core/src/main/kotlin/co/nilin/opex/accountant/core/service/FinancialActionJobManagerImpl.kt b/accountant/accountant-core/src/main/kotlin/co/nilin/opex/accountant/core/service/FinancialActionJobManagerImpl.kt index 944bb4a22..27db01828 100644 --- a/accountant/accountant-core/src/main/kotlin/co/nilin/opex/accountant/core/service/FinancialActionJobManagerImpl.kt +++ b/accountant/accountant-core/src/main/kotlin/co/nilin/opex/accountant/core/service/FinancialActionJobManagerImpl.kt @@ -1,6 +1,5 @@ package co.nilin.opex.accountant.core.service -import co.nilin.opex.accountant.core.api.FinancialActionJobManager import co.nilin.opex.accountant.core.inout.TransferRequest import co.nilin.opex.accountant.core.model.FinancialAction import co.nilin.opex.accountant.core.model.FinancialActionStatus @@ -9,15 +8,16 @@ import co.nilin.opex.accountant.core.spi.FinancialActionPersister import co.nilin.opex.accountant.core.spi.WalletProxy import org.slf4j.LoggerFactory +@Deprecated("We are using kafka for processing now") class FinancialActionJobManagerImpl( private val financialActionLoader: FinancialActionLoader, private val financialActionPersister: FinancialActionPersister, private val walletProxy: WalletProxy -) : FinancialActionJobManager { +) { private val logger = LoggerFactory.getLogger(FinancialActionJobManagerImpl::class.java) - override suspend fun processFinancialActions(offset: Long, size: Long) { + suspend fun processFinancialActions(offset: Long, size: Long) { val factions = financialActionLoader.loadUnprocessed(offset, size) val flatten = sortAndFlattenFA(factions) logger.info("Loaded ${flatten.size} factions: ${flatten.map { it.id }}") diff --git a/accountant/accountant-core/src/main/kotlin/co/nilin/opex/accountant/core/service/FinancialActionProcessorImpl.kt b/accountant/accountant-core/src/main/kotlin/co/nilin/opex/accountant/core/service/FinancialActionProcessorImpl.kt new file mode 100644 index 000000000..65cba6092 --- /dev/null +++ b/accountant/accountant-core/src/main/kotlin/co/nilin/opex/accountant/core/service/FinancialActionProcessorImpl.kt @@ -0,0 +1,59 @@ +package co.nilin.opex.accountant.core.service + +import co.nilin.opex.accountant.core.api.FinancialActionProcessor +import co.nilin.opex.accountant.core.model.FinancialAction +import co.nilin.opex.accountant.core.model.FinancialActionStatus +import co.nilin.opex.accountant.core.spi.FinancialActionLoader +import co.nilin.opex.accountant.core.spi.FinancialActionPersister +import co.nilin.opex.accountant.core.spi.FinancialActionPublisher +import org.slf4j.LoggerFactory + +class FinancialActionProcessorImpl( + private val financialActionLoader: FinancialActionLoader, + private val financialActionPersister: FinancialActionPersister, + private val financialActionPublisher: FinancialActionPublisher +) : FinancialActionProcessor { + + private val logger = LoggerFactory.getLogger(FinancialActionProcessor::class.java) + + override suspend fun process(financialAction: FinancialAction) { + val list = ArrayList() + extractParents(financialAction, list) + process(list) + } + + override suspend fun process(financialActions: List) { + for (i in financialActions.indices) { + val fa = financialActions[i] + try { + if (fa.status == FinancialActionStatus.CREATED) + submitEvent(fa) + } catch (e: Exception) { + logger.error("Unable to submit financial action $fa", e) + break + } + financialActionPersister.updateStatus(fa.uuid, FinancialActionStatus.PROCESSED) + } + } + + override suspend fun batchProcess(offset: Long, size: Long) { + val factions = financialActionLoader.loadUnprocessed(offset, size) + val list = ArrayList() + factions.forEach { extractParents(it, list) } + process(list) + } + + private suspend fun submitEvent(financialAction: FinancialAction) { + financialActionPublisher.publish(financialAction) + } + + fun extractParents(financialAction: FinancialAction, list: ArrayList) { + if (financialAction.parent != null) { + extractParents(financialAction.parent, list) + } + + if (!list.contains(financialAction)) + list.add(financialAction) + } + +} \ No newline at end of file diff --git a/accountant/accountant-core/src/main/kotlin/co/nilin/opex/accountant/core/service/TradeManagerImpl.kt b/accountant/accountant-core/src/main/kotlin/co/nilin/opex/accountant/core/service/TradeManagerImpl.kt index 5c4bec9f9..89ed49e7a 100644 --- a/accountant/accountant-core/src/main/kotlin/co/nilin/opex/accountant/core/service/TradeManagerImpl.kt +++ b/accountant/accountant-core/src/main/kotlin/co/nilin/opex/accountant/core/service/TradeManagerImpl.kt @@ -9,7 +9,6 @@ import co.nilin.opex.accountant.core.model.FinancialAction import co.nilin.opex.accountant.core.model.Order import co.nilin.opex.accountant.core.spi.* import co.nilin.opex.matching.engine.core.eventh.events.TradeEvent -import co.nilin.opex.matching.engine.core.model.OrderDirection import org.slf4j.LoggerFactory import org.springframework.transaction.annotation.Transactional import java.math.BigDecimal diff --git a/accountant/accountant-core/src/main/kotlin/co/nilin/opex/accountant/core/spi/FinancialActionLoader.kt b/accountant/accountant-core/src/main/kotlin/co/nilin/opex/accountant/core/spi/FinancialActionLoader.kt index e10e32b5c..72fe8cd77 100644 --- a/accountant/accountant-core/src/main/kotlin/co/nilin/opex/accountant/core/spi/FinancialActionLoader.kt +++ b/accountant/accountant-core/src/main/kotlin/co/nilin/opex/accountant/core/spi/FinancialActionLoader.kt @@ -3,7 +3,7 @@ package co.nilin.opex.accountant.core.spi import co.nilin.opex.accountant.core.model.FinancialAction interface FinancialActionLoader { - suspend fun findLast(uuid: String, ouid: String): FinancialAction? + suspend fun findLast(userUuid: String, ouid: String): FinancialAction? suspend fun loadUnprocessed(offset: Long, size: Long): List - suspend fun countUnprocessed(uuid: String, symbol: String, eventType: String): Long + suspend fun countUnprocessed(userUuid: String, symbol: String, eventType: String): Long } \ No newline at end of file diff --git a/accountant/accountant-core/src/main/kotlin/co/nilin/opex/accountant/core/spi/FinancialActionPersister.kt b/accountant/accountant-core/src/main/kotlin/co/nilin/opex/accountant/core/spi/FinancialActionPersister.kt index 6b66dc4c3..8da7fbfef 100644 --- a/accountant/accountant-core/src/main/kotlin/co/nilin/opex/accountant/core/spi/FinancialActionPersister.kt +++ b/accountant/accountant-core/src/main/kotlin/co/nilin/opex/accountant/core/spi/FinancialActionPersister.kt @@ -7,7 +7,11 @@ interface FinancialActionPersister { suspend fun persist(financialActions: List): List + suspend fun persistWithStatus(financialAction: FinancialAction, status: FinancialActionStatus) + suspend fun updateStatus(financialAction: FinancialAction, status: FinancialActionStatus) + suspend fun updateStatus(faUuid: String, status: FinancialActionStatus) + suspend fun updateBatchStatus(financialAction: List, status: FinancialActionStatus) } \ No newline at end of file diff --git a/accountant/accountant-core/src/main/kotlin/co/nilin/opex/accountant/core/spi/FinancialActionPublisher.kt b/accountant/accountant-core/src/main/kotlin/co/nilin/opex/accountant/core/spi/FinancialActionPublisher.kt new file mode 100644 index 000000000..bc36bc2ab --- /dev/null +++ b/accountant/accountant-core/src/main/kotlin/co/nilin/opex/accountant/core/spi/FinancialActionPublisher.kt @@ -0,0 +1,7 @@ +package co.nilin.opex.accountant.core.spi + +import co.nilin.opex.accountant.core.model.FinancialAction + +interface FinancialActionPublisher { + suspend fun publish(fa: FinancialAction) +} \ No newline at end of file diff --git a/accountant/accountant-core/src/test/kotlin/co/nilin/opex/accountant/core/service/FAManagerImplTest.kt b/accountant/accountant-core/src/test/kotlin/co/nilin/opex/accountant/core/service/FAManagerImplTest.kt new file mode 100644 index 000000000..cccad8abb --- /dev/null +++ b/accountant/accountant-core/src/test/kotlin/co/nilin/opex/accountant/core/service/FAManagerImplTest.kt @@ -0,0 +1,115 @@ +package co.nilin.opex.accountant.core.service + +import co.nilin.opex.accountant.core.model.FinancialAction +import co.nilin.opex.accountant.core.model.FinancialActionStatus +import co.nilin.opex.accountant.core.spi.FinancialActionLoader +import co.nilin.opex.accountant.core.spi.FinancialActionPersister +import co.nilin.opex.accountant.core.spi.FinancialActionPublisher +import io.mockk.coEvery +import io.mockk.coVerify +import io.mockk.mockk +import kotlinx.coroutines.runBlocking +import org.junit.jupiter.api.Test +import java.time.LocalDateTime +import org.assertj.core.api.Assertions.assertThat + +class FAManagerImplTest { + + private val financialActionLoader = mockk() + private val financialActionPersister = mockk() + private val financialActionPublisher = mockk() + + private val sut = FinancialActionProcessorImpl( + financialActionLoader, + financialActionPersister, + financialActionPublisher + ) + + init { + coEvery { financialActionLoader.loadUnprocessed(any(), any()) } returns arrayListOf(fa2, fa1, fa5, fa3, fa4) + coEvery { financialActionPersister.persist(any()) } returns arrayListOf() + coEvery { financialActionPersister.persistWithStatus(any(), any()) } returns Unit + coEvery { financialActionPublisher.publish(any()) } returns Unit + } + + @Test + fun givenFALoaded_validateParentsAreFirstInLine(): Unit = runBlocking { + val list = arrayListOf() + sut.extractParents(fa5, list) + + assertThat(list.size).isEqualTo(4) + assertThat(list[0].id).isEqualTo(1) + assertThat(list[1].id).isEqualTo(2) + assertThat(list[2].id).isEqualTo(3) + assertThat(list[3].id).isEqualTo(5) + } + + companion object { + private val fa1 = FinancialAction( + null, + "", + "", + "BTCUSDT", + 1.0.toBigDecimal(), + "w1", + "main", + "w2", + "exchange", + LocalDateTime.now(), + id = 1 + ) + private val fa2 = FinancialAction( + fa1, + "", + "", + "BTCUSDT", + 2.0.toBigDecimal(), + "w2", + "exchange", + "w1", + "main", + LocalDateTime.now(), + id = 2 + ) + private val fa3 = FinancialAction( + fa2, + "", + "", + "BTCUSDT", + 1.5.toBigDecimal(), + "w2", + "main", + "w2", + "exchange", + LocalDateTime.now(), + id = 3 + ) + private val fa4 = FinancialAction( + fa2, + "", + "", + "BTCUSDT", + 1.0.toBigDecimal(), + "w1", + "main", + "w1", + "exchange", + LocalDateTime.now(), + id = 4 + ) + private val fa5 = FinancialAction( + fa3, + "", + "", + "BTCUSDT", + 1.1.toBigDecimal(), + "w1", + "main", + "w3", + "exchange", + LocalDateTime.now(), + id = 5 + ) + } + +} \ No newline at end of file diff --git a/accountant/accountant-core/src/test/kotlin/co/nilin/opex/accountant/core/service/OrderManagerImplTest.kt b/accountant/accountant-core/src/test/kotlin/co/nilin/opex/accountant/core/service/OrderManagerImplTest.kt index 201fdc255..5433306ea 100644 --- a/accountant/accountant-core/src/test/kotlin/co/nilin/opex/accountant/core/service/OrderManagerImplTest.kt +++ b/accountant/accountant-core/src/test/kotlin/co/nilin/opex/accountant/core/service/OrderManagerImplTest.kt @@ -23,7 +23,6 @@ import org.assertj.core.api.Assertions.assertThat import org.junit.jupiter.api.Test import org.mockito.kotlin.any import java.math.BigDecimal -import java.time.LocalDateTime internal class OrderManagerImplTest { diff --git a/accountant/accountant-core/src/test/kotlin/co/nilin/opex/accountant/core/service/Valid.kt b/accountant/accountant-core/src/test/kotlin/co/nilin/opex/accountant/core/service/Valid.kt index d54b99029..ae5b5930c 100644 --- a/accountant/accountant-core/src/test/kotlin/co/nilin/opex/accountant/core/service/Valid.kt +++ b/accountant/accountant-core/src/test/kotlin/co/nilin/opex/accountant/core/service/Valid.kt @@ -39,7 +39,7 @@ object Valid { "system", "main", currentTime, - 15 + id = 15 ) var makerOrder = Order( diff --git a/accountant/accountant-ports/accountant-persister-postgres/src/main/kotlin/co/nilin/opex/accountant/ports/postgres/dao/FinancialActionRepository.kt b/accountant/accountant-ports/accountant-persister-postgres/src/main/kotlin/co/nilin/opex/accountant/ports/postgres/dao/FinancialActionRepository.kt index 4ef86a1fa..e682148f9 100644 --- a/accountant/accountant-ports/accountant-persister-postgres/src/main/kotlin/co/nilin/opex/accountant/ports/postgres/dao/FinancialActionRepository.kt +++ b/accountant/accountant-ports/accountant-persister-postgres/src/main/kotlin/co/nilin/opex/accountant/ports/postgres/dao/FinancialActionRepository.kt @@ -15,7 +15,7 @@ import java.math.BigDecimal interface FinancialActionRepository : ReactiveCrudRepository { @Query("select * from fi_actions fi where pointer = :ouid and :uuid in (fi.sender, fi.receiver)") - fun findByOuidAndUuid( + fun findByOuidAndUserUuid( @Param("ouid") ouid: String, @Param("uuid") uuid: String, paging: Pageable @@ -33,11 +33,11 @@ interface FinancialActionRepository : ReactiveCrudRepository @Query("update fi_actions set status = :status where id = :id") - fun updateStatus(@Param("id") id: Long, @Param("status") status: FinancialActionStatus) + fun updateStatus(@Param("id") id: Long, @Param("status") status: FinancialActionStatus): Mono - @Query("update fi_actions set status = :status, retry_count = retry_count + 1 where id = :id") - fun updateStatusAndIncreaseRetry(@Param("id") id: Long, @Param("status") status: FinancialActionStatus): Mono + @Query("update fi_actions set status = :status where uuid = :uuid") + fun updateStatus(uuid: String, status: FinancialActionStatus): Mono @Query("update fi_actions set status = :status where id in (:ids)") - fun updateStatus(ids: List, status: FinancialActionStatus): Mono + fun updateBatchStatus(ids: List, status: FinancialActionStatus): Mono } \ No newline at end of file diff --git a/accountant/accountant-ports/accountant-persister-postgres/src/main/kotlin/co/nilin/opex/accountant/ports/postgres/impl/FinancialActionLoaderImpl.kt b/accountant/accountant-ports/accountant-persister-postgres/src/main/kotlin/co/nilin/opex/accountant/ports/postgres/impl/FinancialActionLoaderImpl.kt index 643a6f80e..34d5c35aa 100644 --- a/accountant/accountant-ports/accountant-persister-postgres/src/main/kotlin/co/nilin/opex/accountant/ports/postgres/impl/FinancialActionLoaderImpl.kt +++ b/accountant/accountant-ports/accountant-persister-postgres/src/main/kotlin/co/nilin/opex/accountant/ports/postgres/impl/FinancialActionLoaderImpl.kt @@ -25,16 +25,16 @@ class FinancialActionLoaderImpl(val financialActionRepository: FinancialActionRe .toList() } - override suspend fun findLast(uuid: String, ouid: String): FinancialAction? { - return financialActionRepository.findByOuidAndUuid( - ouid, uuid, PageRequest.of(0, 1, Sort.by(Sort.Direction.DESC, "createDate")) + override suspend fun findLast(userUuid: String, ouid: String): FinancialAction? { + return financialActionRepository.findByOuidAndUserUuid( + ouid, userUuid, PageRequest.of(0, 1, Sort.by(Sort.Direction.DESC, "createDate")) ).map { loadFinancialAction(it.id) } .firstOrNull() } - override suspend fun countUnprocessed(uuid: String, symbol: String, eventType: String): Long { + override suspend fun countUnprocessed(userUuid: String, symbol: String, eventType: String): Long { return financialActionRepository.findByUuidAndSymbolAndEventTypeAndStatus( - uuid, + userUuid, symbol, eventType, FinancialActionStatus.CREATED @@ -55,7 +55,8 @@ class FinancialActionLoaderImpl(val financialActionRepository: FinancialActionRe fim.receiver, fim.receiverWalletType, fim.createDate, - fim.retryCount, + fim.status, + fim.uuid, fim.id ) } diff --git a/accountant/accountant-ports/accountant-persister-postgres/src/main/kotlin/co/nilin/opex/accountant/ports/postgres/impl/FinancialActionPersisterImpl.kt b/accountant/accountant-ports/accountant-persister-postgres/src/main/kotlin/co/nilin/opex/accountant/ports/postgres/impl/FinancialActionPersisterImpl.kt index 7cb5bb08a..b9fd7e1aa 100644 --- a/accountant/accountant-ports/accountant-persister-postgres/src/main/kotlin/co/nilin/opex/accountant/ports/postgres/impl/FinancialActionPersisterImpl.kt +++ b/accountant/accountant-ports/accountant-persister-postgres/src/main/kotlin/co/nilin/opex/accountant/ports/postgres/impl/FinancialActionPersisterImpl.kt @@ -7,6 +7,7 @@ import co.nilin.opex.accountant.ports.postgres.dao.FinancialActionRepository import co.nilin.opex.accountant.ports.postgres.model.FinancialActionModel import kotlinx.coroutines.reactive.awaitFirstOrNull import kotlinx.coroutines.reactor.awaitSingle +import kotlinx.coroutines.reactor.awaitSingleOrNull import org.springframework.stereotype.Component @Component @@ -17,6 +18,7 @@ class FinancialActionPersisterImpl(private val financialActionRepository: Financ financialActionRepository.saveAll(financialActions.map { FinancialActionModel( null, + it.uuid, it.parent?.id, it.eventType, it.pointer, @@ -34,11 +36,39 @@ class FinancialActionPersisterImpl(private val financialActionRepository: Financ return financialActions } + override suspend fun persistWithStatus(financialAction: FinancialAction, status: FinancialActionStatus) { + financialActionRepository.save( + with(financialAction) { + FinancialActionModel( + null, + uuid, + parent?.id, + eventType, + pointer, + symbol, + amount, + sender, + senderWalletType, + receiver, + receiverWalletType, + "", + "", + createDate, + status + ) + } + ).awaitSingle() + } + override suspend fun updateStatus(financialAction: FinancialAction, status: FinancialActionStatus) { - financialActionRepository.updateStatusAndIncreaseRetry(financialAction.id!!, status).awaitFirstOrNull() + financialActionRepository.updateStatus(financialAction.id!!, status).awaitSingleOrNull() + } + + override suspend fun updateStatus(faUuid: String, status: FinancialActionStatus) { + financialActionRepository.updateStatus(faUuid, status).awaitSingleOrNull() } override suspend fun updateBatchStatus(financialAction: List, status: FinancialActionStatus) { - financialActionRepository.updateStatus(financialAction.mapNotNull { it.id }, status).awaitFirstOrNull() + financialActionRepository.updateBatchStatus(financialAction.mapNotNull { it.id }, status).awaitFirstOrNull() } } \ No newline at end of file diff --git a/accountant/accountant-ports/accountant-persister-postgres/src/main/kotlin/co/nilin/opex/accountant/ports/postgres/model/FinancialActionModel.kt b/accountant/accountant-ports/accountant-persister-postgres/src/main/kotlin/co/nilin/opex/accountant/ports/postgres/model/FinancialActionModel.kt index e7f5ac076..497c87f32 100644 --- a/accountant/accountant-ports/accountant-persister-postgres/src/main/kotlin/co/nilin/opex/accountant/ports/postgres/model/FinancialActionModel.kt +++ b/accountant/accountant-ports/accountant-persister-postgres/src/main/kotlin/co/nilin/opex/accountant/ports/postgres/model/FinancialActionModel.kt @@ -10,6 +10,7 @@ import java.time.LocalDateTime @Table("fi_actions") data class FinancialActionModel( @Id var id: Long?, + val uuid: String, @Column("parent_id") var parentId: Long?, @Column("event_type") val eventType: String, val pointer: String, @@ -22,9 +23,7 @@ data class FinancialActionModel( val agent: String, val ip: String, @Column("create_date") val createDate: LocalDateTime, - val status: FinancialActionStatus = FinancialActionStatus.CREATED, - @Column("retry_count") val retryCount: Int = 0, - @Column("last_try_date") val lastTryDate: LocalDateTime? = null + val status: FinancialActionStatus = FinancialActionStatus.CREATED ) diff --git a/accountant/accountant-ports/accountant-persister-postgres/src/main/resources/schema.sql b/accountant/accountant-ports/accountant-persister-postgres/src/main/resources/schema.sql index 762694d1a..32c0aa5d2 100644 --- a/accountant/accountant-ports/accountant-persister-postgres/src/main/resources/schema.sql +++ b/accountant/accountant-ports/accountant-persister-postgres/src/main/resources/schema.sql @@ -30,6 +30,7 @@ CREATE TABLE IF NOT EXISTS orders CREATE TABLE IF NOT EXISTS fi_actions ( id SERIAL PRIMARY KEY, + uuid VARCHAR(72) NOT NULL UNIQUE, parent_id INTEGER, event_type VARCHAR(72) NOT NULL, pointer VARCHAR(72) NOT NULL, @@ -42,9 +43,7 @@ CREATE TABLE IF NOT EXISTS fi_actions agent VARCHAR(20), ip VARCHAR(11), create_date TIMESTAMP NOT NULL, - status VARCHAR(20), - retry_count DECIMAL, - last_try_date TIMESTAMP + status VARCHAR(20) ); CREATE TABLE IF NOT EXISTS pair_config diff --git a/accountant/accountant-ports/accountant-persister-postgres/src/test/kotlin/co/nilin/opex/accountant/ports/postgres/FAPersisterImplTest.kt b/accountant/accountant-ports/accountant-persister-postgres/src/test/kotlin/co/nilin/opex/accountant/ports/postgres/FAPersisterImplTest.kt index 5813cb072..65f723937 100644 --- a/accountant/accountant-ports/accountant-persister-postgres/src/test/kotlin/co/nilin/opex/accountant/ports/postgres/FAPersisterImplTest.kt +++ b/accountant/accountant-ports/accountant-persister-postgres/src/test/kotlin/co/nilin/opex/accountant/ports/postgres/FAPersisterImplTest.kt @@ -17,7 +17,7 @@ class FAPersisterImplTest { private val financialActionRepository = mockk { coEvery { saveAll(any() as Iterable) } returns Flux.just(Valid.faModel) - coEvery { updateStatusAndIncreaseRetry(any(), any()) } returns Mono.empty() + coEvery { updateBatchStatus(any(), any()) } returns Mono.empty() } private val faPersister = FinancialActionPersisterImpl(financialActionRepository) @@ -29,9 +29,10 @@ class FAPersisterImplTest { @Test fun givenFAAndStatus_whenUpdatingStatusAndFANotFound_throwException(): Unit = runBlocking { + coEvery { financialActionRepository.updateStatus(eq(Valid.fa.id!!), any()) } returns Mono.empty() faPersister.updateStatus(Valid.fa, FinancialActionStatus.CREATED) coVerify { - financialActionRepository.updateStatusAndIncreaseRetry( + financialActionRepository.updateStatus( eq(Valid.fa.id!!), eq(FinancialActionStatus.CREATED) ) diff --git a/accountant/accountant-ports/accountant-persister-postgres/src/test/kotlin/co/nilin/opex/accountant/ports/postgres/Valid.kt b/accountant/accountant-ports/accountant-persister-postgres/src/test/kotlin/co/nilin/opex/accountant/ports/postgres/Valid.kt index 9931a14ff..6d98cc55c 100644 --- a/accountant/accountant-ports/accountant-persister-postgres/src/test/kotlin/co/nilin/opex/accountant/ports/postgres/Valid.kt +++ b/accountant/accountant-ports/accountant-persister-postgres/src/test/kotlin/co/nilin/opex/accountant/ports/postgres/Valid.kt @@ -14,6 +14,7 @@ import co.nilin.opex.matching.engine.core.model.OrderType import co.nilin.opex.matching.engine.core.model.Pair import java.math.BigDecimal import java.time.LocalDateTime +import java.util.UUID object Valid { @@ -128,12 +129,13 @@ object Valid { "system", "main", currentTime, - 1, - 1 + id = 1, + uuid = "uuid" ) val faModel = FinancialActionModel( null, + "uuid", null, TradeEvent::class.java.name, "trade_id", @@ -145,7 +147,7 @@ object Valid { "main", "", "", - currentTime + currentTime, ) } \ No newline at end of file diff --git a/accountant/accountant-ports/accountant-submitter-kafka/src/main/kotlin/co/nilin/opex/accountant/ports/kafka/submitter/config/KafkaTopicConfig.kt b/accountant/accountant-ports/accountant-submitter-kafka/src/main/kotlin/co/nilin/opex/accountant/ports/kafka/submitter/config/KafkaTopicConfig.kt index b8ca2965f..d323a4547 100644 --- a/accountant/accountant-ports/accountant-submitter-kafka/src/main/kotlin/co/nilin/opex/accountant/ports/kafka/submitter/config/KafkaTopicConfig.kt +++ b/accountant/accountant-ports/accountant-submitter-kafka/src/main/kotlin/co/nilin/opex/accountant/ports/kafka/submitter/config/KafkaTopicConfig.kt @@ -35,6 +35,14 @@ class KafkaTopicConfig { .build() }) + registerBean("topic_fiAction", NewTopic::class.java, Supplier { + TopicBuilder.name("fiAction") + .partitions(10) + .replicas(3) + .config(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, "2") + .build() + }) + registerBean("topic_tempevents", NewTopic::class.java, "tempevents", 1, 1) } logger.info("Kafka topics created") diff --git a/accountant/accountant-ports/accountant-submitter-kafka/src/main/kotlin/co/nilin/opex/accountant/ports/kafka/submitter/config/SubmitterKafkaConfig.kt b/accountant/accountant-ports/accountant-submitter-kafka/src/main/kotlin/co/nilin/opex/accountant/ports/kafka/submitter/config/SubmitterKafkaConfig.kt index 800a82105..ada140c75 100644 --- a/accountant/accountant-ports/accountant-submitter-kafka/src/main/kotlin/co/nilin/opex/accountant/ports/kafka/submitter/config/SubmitterKafkaConfig.kt +++ b/accountant/accountant-ports/accountant-submitter-kafka/src/main/kotlin/co/nilin/opex/accountant/ports/kafka/submitter/config/SubmitterKafkaConfig.kt @@ -1,7 +1,9 @@ package co.nilin.opex.accountant.ports.kafka.submitter.config +import co.nilin.opex.accountant.core.inout.FinancialActionEvent import co.nilin.opex.accountant.core.inout.RichOrderEvent import co.nilin.opex.accountant.core.inout.RichTrade +import co.nilin.opex.accountant.core.model.FinancialAction import co.nilin.opex.matching.engine.core.eventh.events.CoreEvent import org.apache.kafka.clients.producer.ProducerConfig import org.apache.kafka.common.serialization.StringSerializer @@ -27,7 +29,7 @@ class SubmitterKafkaConfig { ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG to StringSerializer::class.java, ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG to JsonSerializer::class.java, ProducerConfig.ACKS_CONFIG to "all", - JsonSerializer.TYPE_MAPPINGS to "rich_order_event:co.nilin.opex.accountant.core.inout.RichOrderEvent,rich_order:co.nilin.opex.accountant.core.inout.RichOrder,rich_order_update:co.nilin.opex.accountant.core.inout.RichOrderUpdate, rich_trade:co.nilin.opex.accountant.core.inout.RichTrade" + JsonSerializer.TYPE_MAPPINGS to "rich_order_event:co.nilin.opex.accountant.core.inout.RichOrderEvent,rich_order:co.nilin.opex.accountant.core.inout.RichOrder,rich_order_update:co.nilin.opex.accountant.core.inout.RichOrderUpdate, rich_trade:co.nilin.opex.accountant.core.inout.RichTrade,financial_action:co.nilin.opex.accountant.core.inout.FinancialActionEvent" //ProducerConfig.CLIENT_ID_CONFIG to "", omitting this option as it produces InstanceAlreadyExistsException ) } @@ -61,4 +63,14 @@ class SubmitterKafkaConfig { fun richOrderKafkaTemplate(@Qualifier("richOrderProducerFactory") producerFactory: ProducerFactory): KafkaTemplate { return KafkaTemplate(producerFactory) } + + @Bean("fiActionProducerFactory") + fun fiActionProducerFactory(@Qualifier("producerConfigs") producerConfigs: Map): ProducerFactory { + return DefaultKafkaProducerFactory(producerConfigs) + } + + @Bean("fiActionKafkaTemplate") + fun fiActionKafkaTemplate(@Qualifier("fiActionProducerFactory") producerFactory: ProducerFactory): KafkaTemplate { + return KafkaTemplate(producerFactory) + } } \ No newline at end of file diff --git a/accountant/accountant-ports/accountant-submitter-kafka/src/main/kotlin/co/nilin/opex/accountant/ports/kafka/submitter/service/FinancialActionSubmitter.kt b/accountant/accountant-ports/accountant-submitter-kafka/src/main/kotlin/co/nilin/opex/accountant/ports/kafka/submitter/service/FinancialActionSubmitter.kt new file mode 100644 index 000000000..cbd66dd8b --- /dev/null +++ b/accountant/accountant-ports/accountant-submitter-kafka/src/main/kotlin/co/nilin/opex/accountant/ports/kafka/submitter/service/FinancialActionSubmitter.kt @@ -0,0 +1,49 @@ +package co.nilin.opex.accountant.ports.kafka.submitter.service + +import co.nilin.opex.accountant.core.inout.FinancialActionEvent +import co.nilin.opex.accountant.core.model.FinancialAction +import co.nilin.opex.accountant.core.spi.FinancialActionPublisher +import org.slf4j.LoggerFactory +import org.springframework.beans.factory.annotation.Qualifier +import org.springframework.kafka.core.KafkaTemplate +import org.springframework.stereotype.Component +import kotlin.coroutines.resume +import kotlin.coroutines.resumeWithException +import kotlin.coroutines.suspendCoroutine + +@Component +class FinancialActionSubmitter( + @Qualifier("fiActionKafkaTemplate") + private val kafkaTemplate: KafkaTemplate +) : FinancialActionPublisher, EventPublisher { + + private val logger = LoggerFactory.getLogger(FinancialActionSubmitter::class.java) + override val topic: String = "fiAction" + + override suspend fun publish(fa: FinancialAction): Unit = suspendCoroutine { cont -> + logger.info("Sending financial action event") + val sendFuture = kafkaTemplate.send( + topic, + with(fa) { + FinancialActionEvent( + uuid, + symbol, + amount, + sender, + senderWalletType, + receiver, + receiverWalletType, + createDate, + null, + eventType + pointer + ) + } + ) + sendFuture.addCallback({ + cont.resume(Unit) + }, { + logger.error("Error submitting financial action ${fa.uuid}", it) + cont.resumeWithException(it) + }) + } +} \ No newline at end of file diff --git a/wallet/wallet-app/src/main/kotlin/co/nilin/opex/wallet/app/config/AppConfig.kt b/wallet/wallet-app/src/main/kotlin/co/nilin/opex/wallet/app/config/AppConfig.kt index 445c8502a..6ffd2b2cb 100644 --- a/wallet/wallet-app/src/main/kotlin/co/nilin/opex/wallet/app/config/AppConfig.kt +++ b/wallet/wallet-app/src/main/kotlin/co/nilin/opex/wallet/app/config/AppConfig.kt @@ -1,8 +1,11 @@ package co.nilin.opex.wallet.app.config import co.nilin.opex.wallet.ports.kafka.listener.consumer.AdminEventKafkaListener +import co.nilin.opex.wallet.ports.kafka.listener.consumer.FinancialActionKafkaListener import co.nilin.opex.wallet.ports.kafka.listener.consumer.UserCreatedKafkaListener +import co.nilin.opex.wallet.ports.kafka.listener.model.FinancialActionEvent import co.nilin.opex.wallet.ports.kafka.listener.spi.AdminEventListener +import co.nilin.opex.wallet.ports.kafka.listener.spi.FinancialActionEventListener import co.nilin.opex.wallet.ports.kafka.listener.spi.UserCreatedEventListener import com.fasterxml.jackson.databind.ObjectMapper import com.fasterxml.jackson.databind.SerializationFeature @@ -22,9 +25,12 @@ class AppConfig { userCreatedEventListener: UserCreatedEventListener, adminKafkaEventListener: AdminEventKafkaListener, adminEventListener: AdminEventListener, + financialActionKafkaListener: FinancialActionKafkaListener, + financialActionEventListener: FinancialActionEventListener, ) { useCreatedKafkaListener.addEventListener(userCreatedEventListener) adminKafkaEventListener.addEventListener(adminEventListener) + financialActionKafkaListener.addEventListener(financialActionEventListener) } @Bean diff --git a/wallet/wallet-app/src/main/kotlin/co/nilin/opex/wallet/app/listener/FinancialActionEventListenerImpl.kt b/wallet/wallet-app/src/main/kotlin/co/nilin/opex/wallet/app/listener/FinancialActionEventListenerImpl.kt new file mode 100644 index 000000000..ba0a3e425 --- /dev/null +++ b/wallet/wallet-app/src/main/kotlin/co/nilin/opex/wallet/app/listener/FinancialActionEventListenerImpl.kt @@ -0,0 +1,35 @@ +package co.nilin.opex.wallet.app.listener + +import co.nilin.opex.wallet.app.service.TransferService +import co.nilin.opex.wallet.ports.kafka.listener.model.FinancialActionEvent +import co.nilin.opex.wallet.ports.kafka.listener.spi.FinancialActionEventListener +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.runBlocking +import org.slf4j.LoggerFactory +import org.springframework.stereotype.Component + +@Component +class FinancialActionEventListenerImpl(private val transferService: TransferService) : FinancialActionEventListener { + + private val logger = LoggerFactory.getLogger(FinancialActionEventListenerImpl::class.java) + + override fun id(): String { + return "FinancialActionEventListener" + } + + override fun onEvent(event: FinancialActionEvent, partition: Int, offset: Long, timestamp: Long) { + logger.info("On FinancialActionEvent ${event.uuid}") + runBlocking(Dispatchers.IO) { + transferService.transfer( + event.symbol, + event.senderWalletType, + event.sender, + event.receiverWalletType, + event.receiver, + event.amount, + event.description, + event.transferRef + ) + } + } +} \ No newline at end of file diff --git a/wallet/wallet-ports/wallet-eventlistener-kafka/src/main/kotlin/co/nilin/opex/wallet/ports/kafka/listener/config/KafkaProducerConfig.kt b/wallet/wallet-ports/wallet-eventlistener-kafka/src/main/kotlin/co/nilin/opex/wallet/ports/kafka/listener/config/KafkaProducerConfig.kt index 574120a10..391904973 100644 --- a/wallet/wallet-ports/wallet-eventlistener-kafka/src/main/kotlin/co/nilin/opex/wallet/ports/kafka/listener/config/KafkaProducerConfig.kt +++ b/wallet/wallet-ports/wallet-eventlistener-kafka/src/main/kotlin/co/nilin/opex/wallet/ports/kafka/listener/config/KafkaProducerConfig.kt @@ -1,6 +1,7 @@ package co.nilin.opex.wallet.ports.kafka.listener.config import co.nilin.opex.wallet.ports.kafka.listener.model.AdminEvent +import co.nilin.opex.wallet.ports.kafka.listener.model.FinancialActionEvent import co.nilin.opex.wallet.ports.kafka.listener.model.UserCreatedEvent import org.apache.kafka.clients.producer.ProducerConfig import org.apache.kafka.common.serialization.StringSerializer @@ -39,6 +40,16 @@ class KafkaProducerConfig { return KafkaTemplate(factory) } + @Bean("financialActionProducerFactory") + fun financialActionProducerFactory(@Qualifier("producerConfigs") producerConfigs: Map): ProducerFactory { + return DefaultKafkaProducerFactory(producerConfigs) + } + + @Bean("financialActionKafkaTemplate") + fun financialActionKafkaTemplate(factory: ProducerFactory): KafkaTemplate { + return KafkaTemplate(factory) + } + @Bean fun adminProducerFactory(@Qualifier("producerConfigs") config: Map): ProducerFactory { return DefaultKafkaProducerFactory(config) diff --git a/wallet/wallet-ports/wallet-eventlistener-kafka/src/main/kotlin/co/nilin/opex/wallet/ports/kafka/listener/config/WalletKafkaConfig.kt b/wallet/wallet-ports/wallet-eventlistener-kafka/src/main/kotlin/co/nilin/opex/wallet/ports/kafka/listener/config/WalletKafkaConfig.kt index e89b86978..0c48ed5f9 100644 --- a/wallet/wallet-ports/wallet-eventlistener-kafka/src/main/kotlin/co/nilin/opex/wallet/ports/kafka/listener/config/WalletKafkaConfig.kt +++ b/wallet/wallet-ports/wallet-eventlistener-kafka/src/main/kotlin/co/nilin/opex/wallet/ports/kafka/listener/config/WalletKafkaConfig.kt @@ -1,8 +1,10 @@ package co.nilin.opex.wallet.ports.kafka.listener.config import co.nilin.opex.wallet.ports.kafka.listener.consumer.AdminEventKafkaListener +import co.nilin.opex.wallet.ports.kafka.listener.consumer.FinancialActionKafkaListener import co.nilin.opex.wallet.ports.kafka.listener.consumer.UserCreatedKafkaListener import co.nilin.opex.wallet.ports.kafka.listener.model.AdminEvent +import co.nilin.opex.wallet.ports.kafka.listener.model.FinancialActionEvent import co.nilin.opex.wallet.ports.kafka.listener.model.UserCreatedEvent import org.apache.kafka.clients.consumer.ConsumerConfig import org.apache.kafka.common.TopicPartition @@ -38,7 +40,7 @@ class WalletKafkaConfig { ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG to StringDeserializer::class.java, ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG to JsonDeserializer::class.java, JsonDeserializer.TRUSTED_PACKAGES to "co.nilin.opex.*", - JsonDeserializer.TYPE_MAPPINGS to "user_created_event:co.nilin.opex.wallet.ports.kafka.listener.model.UserCreatedEvent,admin_add_currency:co.nilin.opex.wallet.ports.kafka.listener.model.AddCurrencyEvent,admin_edit_currency:co.nilin.opex.wallet.ports.kafka.listener.model.EditCurrencyEvent,admin_delete_currency:co.nilin.opex.wallet.ports.kafka.listener.model.DeleteCurrencyEvent" + JsonDeserializer.TYPE_MAPPINGS to "user_created_event:co.nilin.opex.wallet.ports.kafka.listener.model.UserCreatedEvent,admin_add_currency:co.nilin.opex.wallet.ports.kafka.listener.model.AddCurrencyEvent,admin_edit_currency:co.nilin.opex.wallet.ports.kafka.listener.model.EditCurrencyEvent,admin_delete_currency:co.nilin.opex.wallet.ports.kafka.listener.model.DeleteCurrencyEvent,financial_action:co.nilin.opex.wallet.ports.kafka.listener.model.FinancialActionEvent" ) } @@ -47,6 +49,11 @@ class WalletKafkaConfig { return DefaultKafkaConsumerFactory(consumerConfigs) } + @Bean("financialActionConsumerFactory") + fun financialActionConsumerFactory(@Qualifier("consumerConfigs") consumerConfigs: Map): ConsumerFactory { + return DefaultKafkaConsumerFactory(consumerConfigs) + } + @Bean fun adminEventsConsumerFactory(@Qualifier("consumerConfigs") consumerConfigs: Map): ConsumerFactory { return DefaultKafkaConsumerFactory(consumerConfigs) @@ -67,6 +74,21 @@ class WalletKafkaConfig { container.start() } + @Autowired + @ConditionalOnBean(FinancialActionKafkaListener::class) + fun configureFinancialActionListener( + listener: FinancialActionKafkaListener, + template: KafkaTemplate, + @Qualifier("financialActionConsumerFactory") consumerFactory: ConsumerFactory + ) { + val containerProps = ContainerProperties(Pattern.compile("fiAction")) + containerProps.messageListener = listener + val container = ConcurrentMessageListenerContainer(consumerFactory, containerProps) + container.setBeanName("FinancialActionKafkaListenerContainer") + container.commonErrorHandler = createConsumerErrorHandler(template, "fiAction.DLT") + container.start() + } + @Autowired @ConditionalOnBean(AdminEventKafkaListener::class) fun configureAdminEventListener( diff --git a/wallet/wallet-ports/wallet-eventlistener-kafka/src/main/kotlin/co/nilin/opex/wallet/ports/kafka/listener/consumer/FinancialActionKafkaListener.kt b/wallet/wallet-ports/wallet-eventlistener-kafka/src/main/kotlin/co/nilin/opex/wallet/ports/kafka/listener/consumer/FinancialActionKafkaListener.kt new file mode 100644 index 000000000..08a4e196c --- /dev/null +++ b/wallet/wallet-ports/wallet-eventlistener-kafka/src/main/kotlin/co/nilin/opex/wallet/ports/kafka/listener/consumer/FinancialActionKafkaListener.kt @@ -0,0 +1,24 @@ +package co.nilin.opex.wallet.ports.kafka.listener.consumer + +import co.nilin.opex.wallet.ports.kafka.listener.model.FinancialActionEvent +import co.nilin.opex.wallet.ports.kafka.listener.spi.FinancialActionEventListener +import co.nilin.opex.wallet.ports.kafka.listener.spi.UserCreatedEventListener +import org.apache.kafka.clients.consumer.ConsumerRecord +import org.springframework.kafka.listener.MessageListener +import org.springframework.stereotype.Component + +@Component +class FinancialActionKafkaListener : MessageListener { + + val eventListeners = arrayListOf() + + override fun onMessage(data: ConsumerRecord) { + eventListeners.forEach { + it.onEvent(data.value(), data.partition(), data.offset(), data.timestamp()) + } + } + + fun addEventListener(tl: FinancialActionEventListener) { + eventListeners.add(tl) + } +} \ No newline at end of file diff --git a/wallet/wallet-ports/wallet-eventlistener-kafka/src/main/kotlin/co/nilin/opex/wallet/ports/kafka/listener/model/FinancialActionEvent.kt b/wallet/wallet-ports/wallet-eventlistener-kafka/src/main/kotlin/co/nilin/opex/wallet/ports/kafka/listener/model/FinancialActionEvent.kt new file mode 100644 index 000000000..ceee87c78 --- /dev/null +++ b/wallet/wallet-ports/wallet-eventlistener-kafka/src/main/kotlin/co/nilin/opex/wallet/ports/kafka/listener/model/FinancialActionEvent.kt @@ -0,0 +1,46 @@ +package co.nilin.opex.wallet.ports.kafka.listener.model + +import java.math.BigDecimal +import java.time.LocalDateTime + +class FinancialActionEvent { + + lateinit var uuid: String + lateinit var symbol: String + lateinit var amount: BigDecimal + lateinit var sender: String + lateinit var senderWalletType: String + lateinit var receiver: String + lateinit var receiverWalletType: String + lateinit var createDate: LocalDateTime + var transferRef: String? = null + lateinit var description: String + + constructor() { + + } + + constructor( + uuid: String, + symbol: String, + amount: BigDecimal, + sender: String, + senderWalletType: String, + receiver: String, + receiverWalletType: String, + createDate: LocalDateTime, + transferRef: String, + description: String + ) { + this.uuid = uuid + this.symbol = symbol + this.amount = amount + this.sender = sender + this.senderWalletType = senderWalletType + this.receiver = receiver + this.receiverWalletType = receiverWalletType + this.createDate = createDate + this.transferRef = transferRef + this.description = description + } +} \ No newline at end of file diff --git a/wallet/wallet-ports/wallet-eventlistener-kafka/src/main/kotlin/co/nilin/opex/wallet/ports/kafka/listener/spi/FinancialActionEventListener.kt b/wallet/wallet-ports/wallet-eventlistener-kafka/src/main/kotlin/co/nilin/opex/wallet/ports/kafka/listener/spi/FinancialActionEventListener.kt new file mode 100644 index 000000000..908b63cfd --- /dev/null +++ b/wallet/wallet-ports/wallet-eventlistener-kafka/src/main/kotlin/co/nilin/opex/wallet/ports/kafka/listener/spi/FinancialActionEventListener.kt @@ -0,0 +1,11 @@ +package co.nilin.opex.wallet.ports.kafka.listener.spi + +import co.nilin.opex.wallet.ports.kafka.listener.model.FinancialActionEvent + +interface FinancialActionEventListener { + + fun id(): String + + fun onEvent(event: FinancialActionEvent, partition: Int, offset: Long, timestamp: Long) + +} \ No newline at end of file From cd1cbd76284b06a16b8aa9427de48669e2dda000 Mon Sep 17 00:00:00 2001 From: Peyman Date: Tue, 7 Mar 2023 19:08:53 +0330 Subject: [PATCH 2/6] Downgrade vault version --- docker-images/vault/Dockerfile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docker-images/vault/Dockerfile b/docker-images/vault/Dockerfile index a4e183276..38d4f35fa 100644 --- a/docker-images/vault/Dockerfile +++ b/docker-images/vault/Dockerfile @@ -1,4 +1,4 @@ -FROM vault:1.12.2 +FROM vault:1.11.7 COPY ["backend-policy.hcl", "panel-policy.hcl", "vault.json", "workflow-vault.sh", "/vault/config/"] EXPOSE 8200 ENTRYPOINT /vault/config/workflow-vault.sh From 07eeade426bf58740598436de088968ba4d2e594 Mon Sep 17 00:00:00 2001 From: Peyman Date: Sat, 11 Mar 2023 17:44:35 +0330 Subject: [PATCH 3/6] Action processor inside ordermanager and trademanager --- .../opex/accountant/app/config/AppConfig.kt | 28 +++++++++---------- .../listener/AccountantTempEventListener.kt | 7 ++--- .../app/listener/AccountantTradeListener.kt | 10 ++----- .../accountant/app/listener/OrderListener.kt | 10 ++----- .../app/scheduler/FinancialActionsJob.kt | 2 +- .../core/service/OrderManagerImpl.kt | 7 ++++- .../core/service/TradeManagerImpl.kt | 8 ++++-- .../core/service/OrderManagerImplTest.kt | 6 +++- .../core/service/TradeManagerImplTest.kt | 11 ++++++-- 9 files changed, 47 insertions(+), 42 deletions(-) diff --git a/accountant/accountant-app/src/main/kotlin/co/nilin/opex/accountant/app/config/AppConfig.kt b/accountant/accountant-app/src/main/kotlin/co/nilin/opex/accountant/app/config/AppConfig.kt index de2a219d5..9c0ce90c3 100644 --- a/accountant/accountant-app/src/main/kotlin/co/nilin/opex/accountant/app/config/AppConfig.kt +++ b/accountant/accountant-app/src/main/kotlin/co/nilin/opex/accountant/app/config/AppConfig.kt @@ -26,7 +26,7 @@ import org.springframework.scheduling.annotation.EnableScheduling class AppConfig { @Bean - fun getFinancialActionJobManager( + fun financialActionProcessor( financialActionLoader: FinancialActionLoader, financialActionPersister: FinancialActionPersister, financialActionPublisher: FinancialActionPublisher @@ -47,7 +47,8 @@ class AppConfig { orderPersister: OrderPersister, tempEventPersister: TempEventPersister, tempEventRepublisher: TempEventRepublisher, - richOrderPublisher: RichOrderPublisher + richOrderPublisher: RichOrderPublisher, + financialActionProcessor: FinancialActionProcessor ): OrderManager { return OrderManagerImpl( pairConfigLoader, @@ -56,7 +57,8 @@ class AppConfig { financeActionLoader, orderPersister, tempEventPersister, - richOrderPublisher + richOrderPublisher, + financialActionProcessor ) } @@ -69,6 +71,7 @@ class AppConfig { richTradePublisher: RichTradePublisher, richOrderPublisher: RichOrderPublisher, feeCalculator: FeeCalculator, + financialActionProcessor: FinancialActionProcessor ): TradeManager { return TradeManagerImpl( financeActionPersister, @@ -77,21 +80,19 @@ class AppConfig { tempEventPersister, richTradePublisher, richOrderPublisher, - feeCalculator + feeCalculator, + financialActionProcessor ) } @Bean - fun orderListener(orderManager: OrderManager, financialActionProcessor: FinancialActionProcessor): OrderListener { - return OrderListener(orderManager, financialActionProcessor) + fun orderListener(orderManager: OrderManager): OrderListener { + return OrderListener(orderManager) } @Bean - fun accountantTradeListener( - tradeManager: TradeManager, - financialActionProcessor: FinancialActionProcessor - ): AccountantTradeListener { - return AccountantTradeListener(tradeManager, financialActionProcessor) + fun accountantTradeListener(tradeManager: TradeManager): AccountantTradeListener { + return AccountantTradeListener(tradeManager) } @Bean @@ -102,10 +103,9 @@ class AppConfig { @Bean fun accountantTempEventListener( orderManager: OrderManager, - tradeManager: TradeManager, - financialActionProcessor: FinancialActionProcessor + tradeManager: TradeManager ): AccountantTempEventListener { - return AccountantTempEventListener(orderManager, tradeManager, financialActionProcessor) + return AccountantTempEventListener(orderManager, tradeManager) } @Autowired diff --git a/accountant/accountant-app/src/main/kotlin/co/nilin/opex/accountant/app/listener/AccountantTempEventListener.kt b/accountant/accountant-app/src/main/kotlin/co/nilin/opex/accountant/app/listener/AccountantTempEventListener.kt index 7bea85fab..1d559801f 100644 --- a/accountant/accountant-app/src/main/kotlin/co/nilin/opex/accountant/app/listener/AccountantTempEventListener.kt +++ b/accountant/accountant-app/src/main/kotlin/co/nilin/opex/accountant/app/listener/AccountantTempEventListener.kt @@ -1,6 +1,5 @@ package co.nilin.opex.accountant.app.listener -import co.nilin.opex.accountant.core.api.FinancialActionProcessor import co.nilin.opex.accountant.core.api.OrderManager import co.nilin.opex.accountant.core.api.TradeManager import co.nilin.opex.accountant.ports.kafka.listener.spi.TempEventListener @@ -10,8 +9,7 @@ import org.slf4j.LoggerFactory class AccountantTempEventListener( private val orderManager: OrderManager, - private val tradeManager: TradeManager, - private val financialActionProcessor: FinancialActionProcessor + private val tradeManager: TradeManager ) : TempEventListener { private val logger = LoggerFactory.getLogger(AccountantTempEventListener::class.java) @@ -23,7 +21,7 @@ class AccountantTempEventListener( override fun onEvent(event: CoreEvent, partition: Int, offset: Long, timestamp: Long) { logger.info("TempEvent received $event") runBlocking { - val fa = when (event) { + when (event) { is CreateOrderEvent -> orderManager.handleNewOrder(event) is RejectOrderEvent -> orderManager.handleRejectOrder(event) is UpdatedOrderEvent -> orderManager.handleUpdateOrder(event) @@ -33,7 +31,6 @@ class AccountantTempEventListener( throw IllegalArgumentException("Event is not accepted ${event::class.java}") } } - financialActionProcessor.process(fa) } logger.info("TempEvent processed") } diff --git a/accountant/accountant-app/src/main/kotlin/co/nilin/opex/accountant/app/listener/AccountantTradeListener.kt b/accountant/accountant-app/src/main/kotlin/co/nilin/opex/accountant/app/listener/AccountantTradeListener.kt index bf93778f8..650db2f64 100644 --- a/accountant/accountant-app/src/main/kotlin/co/nilin/opex/accountant/app/listener/AccountantTradeListener.kt +++ b/accountant/accountant-app/src/main/kotlin/co/nilin/opex/accountant/app/listener/AccountantTradeListener.kt @@ -1,16 +1,11 @@ package co.nilin.opex.accountant.app.listener -import co.nilin.opex.accountant.core.api.FinancialActionProcessor import co.nilin.opex.accountant.core.api.TradeManager -import co.nilin.opex.accountant.ports.kafka.listener.spi.Listener import co.nilin.opex.accountant.ports.kafka.listener.spi.TradeListener import co.nilin.opex.matching.engine.core.eventh.events.TradeEvent import kotlinx.coroutines.runBlocking -class AccountantTradeListener( - private val tradeManager: TradeManager, - private val financialActionProcessor: FinancialActionProcessor -) : TradeListener { +class AccountantTradeListener(private val tradeManager: TradeManager) : TradeListener { override fun id(): String { return "TradeListener" @@ -18,8 +13,7 @@ class AccountantTradeListener( override fun onEvent(event: TradeEvent, partition: Int, offset: Long, timestamp: Long) { runBlocking { - val fa = tradeManager.handleTrade(event) - financialActionProcessor.process(fa) + tradeManager.handleTrade(event) } } } \ No newline at end of file diff --git a/accountant/accountant-app/src/main/kotlin/co/nilin/opex/accountant/app/listener/OrderListener.kt b/accountant/accountant-app/src/main/kotlin/co/nilin/opex/accountant/app/listener/OrderListener.kt index 145bc6fbe..52f95ef56 100644 --- a/accountant/accountant-app/src/main/kotlin/co/nilin/opex/accountant/app/listener/OrderListener.kt +++ b/accountant/accountant-app/src/main/kotlin/co/nilin/opex/accountant/app/listener/OrderListener.kt @@ -1,6 +1,5 @@ package co.nilin.opex.accountant.app.listener -import co.nilin.opex.accountant.core.api.FinancialActionProcessor import co.nilin.opex.accountant.core.api.OrderManager import co.nilin.opex.accountant.ports.kafka.listener.inout.OrderSubmitRequest import co.nilin.opex.accountant.ports.kafka.listener.spi.OrderSubmitRequestListener @@ -8,10 +7,7 @@ import co.nilin.opex.matching.engine.core.eventh.events.SubmitOrderEvent import kotlinx.coroutines.runBlocking import org.slf4j.LoggerFactory -class OrderListener( - private val orderManager: OrderManager, - private val financialManagerProcessor: FinancialActionProcessor, -) : OrderSubmitRequestListener { +class OrderListener(private val orderManager: OrderManager) : OrderSubmitRequestListener { private val logger = LoggerFactory.getLogger(OrderListener::class.java) @@ -23,7 +19,7 @@ class OrderListener( runBlocking { logger.info("Order submit event received ${event.ouid}") - val fa = orderManager.handleRequestOrder( + orderManager.handleRequestOrder( SubmitOrderEvent( event.ouid, event.uuid, @@ -38,8 +34,6 @@ class OrderListener( event.userLevel ) ) - - financialManagerProcessor.process(fa) } } } \ No newline at end of file diff --git a/accountant/accountant-app/src/main/kotlin/co/nilin/opex/accountant/app/scheduler/FinancialActionsJob.kt b/accountant/accountant-app/src/main/kotlin/co/nilin/opex/accountant/app/scheduler/FinancialActionsJob.kt index b495f2d89..2e94d3310 100644 --- a/accountant/accountant-app/src/main/kotlin/co/nilin/opex/accountant/app/scheduler/FinancialActionsJob.kt +++ b/accountant/accountant-app/src/main/kotlin/co/nilin/opex/accountant/app/scheduler/FinancialActionsJob.kt @@ -14,7 +14,7 @@ class FinancialActionsJob(private val financialActionProcessor: FinancialActionP private val log = LoggerFactory.getLogger(FinancialActionsJob::class.java) private val scope = CoroutineScope(Dispatchers.IO) - @Scheduled(fixedDelay = 10000) + @Scheduled(fixedDelay = 10000, initialDelay = 10000) fun processFinancialActions() { scope.ensureActive() if (!scope.isCompleted()) diff --git a/accountant/accountant-core/src/main/kotlin/co/nilin/opex/accountant/core/service/OrderManagerImpl.kt b/accountant/accountant-core/src/main/kotlin/co/nilin/opex/accountant/core/service/OrderManagerImpl.kt index 9c840bf73..90e9f0709 100644 --- a/accountant/accountant-core/src/main/kotlin/co/nilin/opex/accountant/core/service/OrderManagerImpl.kt +++ b/accountant/accountant-core/src/main/kotlin/co/nilin/opex/accountant/core/service/OrderManagerImpl.kt @@ -1,5 +1,6 @@ package co.nilin.opex.accountant.core.service +import co.nilin.opex.accountant.core.api.FinancialActionProcessor import co.nilin.opex.accountant.core.api.OrderManager import co.nilin.opex.accountant.core.inout.OrderStatus import co.nilin.opex.accountant.core.inout.RichOrder @@ -20,7 +21,8 @@ open class OrderManagerImpl( private val financeActionLoader: FinancialActionLoader, private val orderPersister: OrderPersister, private val tempEventPersister: TempEventPersister, - private val richOrderPublisher: RichOrderPublisher + private val richOrderPublisher: RichOrderPublisher, + private val financialActionProcessor: FinancialActionProcessor ) : OrderManager { @Transactional @@ -94,6 +96,7 @@ open class OrderManagerImpl( OrderStatus.REQUESTED.code ) ) + financialActionProcessor.process(financialAction) return financialActionPersister.persist(listOf(financialAction)) } @@ -157,6 +160,7 @@ open class OrderManagerImpl( OrderStatus.REJECTED ) ) + financialActionProcessor.process(financialAction) return financialActionPersister.persist(listOf(financialAction)) } @@ -201,6 +205,7 @@ open class OrderManagerImpl( OrderStatus.REJECTED ) ) + financialActionProcessor.process(financialAction) return financialActionPersister.persist(listOf(financialAction)) } diff --git a/accountant/accountant-core/src/main/kotlin/co/nilin/opex/accountant/core/service/TradeManagerImpl.kt b/accountant/accountant-core/src/main/kotlin/co/nilin/opex/accountant/core/service/TradeManagerImpl.kt index 89ed49e7a..7b7451da9 100644 --- a/accountant/accountant-core/src/main/kotlin/co/nilin/opex/accountant/core/service/TradeManagerImpl.kt +++ b/accountant/accountant-core/src/main/kotlin/co/nilin/opex/accountant/core/service/TradeManagerImpl.kt @@ -1,6 +1,7 @@ package co.nilin.opex.accountant.core.service import co.nilin.opex.accountant.core.api.FeeCalculator +import co.nilin.opex.accountant.core.api.FinancialActionProcessor import co.nilin.opex.accountant.core.api.TradeManager import co.nilin.opex.accountant.core.inout.OrderStatus import co.nilin.opex.accountant.core.inout.RichOrderUpdate @@ -21,7 +22,8 @@ open class TradeManagerImpl( private val tempEventPersister: TempEventPersister, private val richTradePublisher: RichTradePublisher, private val richOrderPublisher: RichOrderPublisher, - private val feeCalculator: FeeCalculator + private val feeCalculator: FeeCalculator, + private val financialActionProcessor: FinancialActionProcessor ) : TradeManager { private val log = LoggerFactory.getLogger(TradeManagerImpl::class.java) @@ -167,7 +169,9 @@ open class TradeManagerImpl( trade.eventDate ) ) - return financeActionPersister.persist(financialActions) + return financeActionPersister.persist(financialActions).also { + financialActionProcessor.process(financialActions) + } } private suspend fun publishTakerRichOrderUpdate(takerOrder: Order, trade: TradeEvent) { diff --git a/accountant/accountant-core/src/test/kotlin/co/nilin/opex/accountant/core/service/OrderManagerImplTest.kt b/accountant/accountant-core/src/test/kotlin/co/nilin/opex/accountant/core/service/OrderManagerImplTest.kt index 5433306ea..35cfe70a2 100644 --- a/accountant/accountant-core/src/test/kotlin/co/nilin/opex/accountant/core/service/OrderManagerImplTest.kt +++ b/accountant/accountant-core/src/test/kotlin/co/nilin/opex/accountant/core/service/OrderManagerImplTest.kt @@ -1,5 +1,6 @@ package co.nilin.opex.accountant.core.service +import co.nilin.opex.accountant.core.api.FinancialActionProcessor import co.nilin.opex.accountant.core.inout.OrderStatus import co.nilin.opex.accountant.core.model.FinancialAction import co.nilin.opex.accountant.core.model.PairConfig @@ -33,6 +34,7 @@ internal class OrderManagerImplTest { private val pairConfigLoader = mockk() private val richOrderPublisher = mockk() private val userLevelLoader = mockk() + private val financialActionProcessor = mockk() private val orderManager = OrderManagerImpl( pairConfigLoader, @@ -41,7 +43,8 @@ internal class OrderManagerImplTest { financialActionLoader, orderPersister, tempEventPersister, - richOrderPublisher + richOrderPublisher, + financialActionProcessor ) init { @@ -52,6 +55,7 @@ internal class OrderManagerImplTest { coEvery { financialActionLoader.findLast(any(), any()) } returns null coEvery { financialActionPersister.persist(any()) } returnsArgument (0) coEvery { userLevelLoader.load(any()) } returns "*" + coEvery { financialActionProcessor.process(any()) } returns Unit } @Test diff --git a/accountant/accountant-core/src/test/kotlin/co/nilin/opex/accountant/core/service/TradeManagerImplTest.kt b/accountant/accountant-core/src/test/kotlin/co/nilin/opex/accountant/core/service/TradeManagerImplTest.kt index 4e577b3cb..1fbca03d4 100644 --- a/accountant/accountant-core/src/test/kotlin/co/nilin/opex/accountant/core/service/TradeManagerImplTest.kt +++ b/accountant/accountant-core/src/test/kotlin/co/nilin/opex/accountant/core/service/TradeManagerImplTest.kt @@ -1,5 +1,7 @@ package co.nilin.opex.accountant.core.service +import co.nilin.opex.accountant.core.api.FinancialActionProcessor +import co.nilin.opex.accountant.core.model.FinancialAction import co.nilin.opex.accountant.core.model.Order import co.nilin.opex.accountant.core.model.PairConfig import co.nilin.opex.accountant.core.model.PairFeeConfig @@ -27,6 +29,7 @@ internal class TradeManagerImplTest { private val richOrderPublisher = mockk() private val richTradePublisher = mockk() private val userLevelLoader = mockk() + private val financialActionProcessor = mockk() private val orderManager = OrderManagerImpl( pairConfigLoader, @@ -35,7 +38,8 @@ internal class TradeManagerImplTest { financeActionLoader, orderPersister, tempEventPersister, - richOrderPublisher + richOrderPublisher, + financialActionProcessor ) private val tradeManager = TradeManagerImpl( @@ -45,7 +49,8 @@ internal class TradeManagerImplTest { tempEventPersister, richTradePublisher, richOrderPublisher, - FeeCalculatorImpl("0x0") + FeeCalculatorImpl("0x0"), + financialActionProcessor ) init { @@ -55,6 +60,8 @@ internal class TradeManagerImplTest { coEvery { richOrderPublisher.publish(any()) } returns Unit coEvery { richTradePublisher.publish(any()) } returns Unit coEvery { userLevelLoader.load(any()) } returns "*" + coEvery { financialActionProcessor.process(any()) } returns Unit + coEvery { financialActionProcessor.process(any>()) } returns Unit } @Test From 4ee18f51a595d0b10841b89f32c9fafe76d155b3 Mon Sep 17 00:00:00 2001 From: Peyman Date: Sat, 11 Mar 2023 18:59:55 +0330 Subject: [PATCH 4/6] Change publisher pathway --- .../opex/accountant/app/config/AppConfig.kt | 28 +---- .../app/scheduler/FinancialActionsJob.kt | 7 +- .../core/api/FinancialActionJobManager.kt | 5 + .../core/api/FinancialActionProcessor.kt | 12 -- .../service/FinancialActionJobManagerImpl.kt | 58 ++++----- .../service/FinancialActionProcessorImpl.kt | 59 --------- .../core/service/OrderManagerImpl.kt | 29 +++-- .../core/service/TradeManagerImpl.kt | 51 +++++--- .../core/service/FAJobManagerImplTest.kt | 58 --------- .../core/service/FAManagerImplTest.kt | 115 ------------------ .../core/service/OrderManagerImplTest.kt | 8 +- .../core/service/TradeManagerImplTest.kt | 11 +- 12 files changed, 106 insertions(+), 335 deletions(-) create mode 100644 accountant/accountant-core/src/main/kotlin/co/nilin/opex/accountant/core/api/FinancialActionJobManager.kt delete mode 100644 accountant/accountant-core/src/main/kotlin/co/nilin/opex/accountant/core/api/FinancialActionProcessor.kt delete mode 100644 accountant/accountant-core/src/main/kotlin/co/nilin/opex/accountant/core/service/FinancialActionProcessorImpl.kt delete mode 100644 accountant/accountant-core/src/test/kotlin/co/nilin/opex/accountant/core/service/FAJobManagerImplTest.kt delete mode 100644 accountant/accountant-core/src/test/kotlin/co/nilin/opex/accountant/core/service/FAManagerImplTest.kt diff --git a/accountant/accountant-app/src/main/kotlin/co/nilin/opex/accountant/app/config/AppConfig.kt b/accountant/accountant-app/src/main/kotlin/co/nilin/opex/accountant/app/config/AppConfig.kt index 9c0ce90c3..30080a8c9 100644 --- a/accountant/accountant-app/src/main/kotlin/co/nilin/opex/accountant/app/config/AppConfig.kt +++ b/accountant/accountant-app/src/main/kotlin/co/nilin/opex/accountant/app/config/AppConfig.kt @@ -4,11 +4,8 @@ import co.nilin.opex.accountant.app.listener.AccountantEventListener import co.nilin.opex.accountant.app.listener.AccountantTempEventListener import co.nilin.opex.accountant.app.listener.AccountantTradeListener import co.nilin.opex.accountant.app.listener.OrderListener -import co.nilin.opex.accountant.core.api.FeeCalculator -import co.nilin.opex.accountant.core.api.FinancialActionProcessor -import co.nilin.opex.accountant.core.api.OrderManager -import co.nilin.opex.accountant.core.api.TradeManager -import co.nilin.opex.accountant.core.service.FinancialActionProcessorImpl +import co.nilin.opex.accountant.core.api.* +import co.nilin.opex.accountant.core.service.FinancialActionJobManagerImpl import co.nilin.opex.accountant.core.service.OrderManagerImpl import co.nilin.opex.accountant.core.service.TradeManagerImpl import co.nilin.opex.accountant.core.spi.* @@ -25,19 +22,6 @@ import org.springframework.scheduling.annotation.EnableScheduling @EnableScheduling class AppConfig { - @Bean - fun financialActionProcessor( - financialActionLoader: FinancialActionLoader, - financialActionPersister: FinancialActionPersister, - financialActionPublisher: FinancialActionPublisher - ): FinancialActionProcessor { - return FinancialActionProcessorImpl( - financialActionLoader, - financialActionPersister, - financialActionPublisher - ) - } - @Bean fun orderManager( pairConfigLoader: PairConfigLoader, @@ -48,7 +32,7 @@ class AppConfig { tempEventPersister: TempEventPersister, tempEventRepublisher: TempEventRepublisher, richOrderPublisher: RichOrderPublisher, - financialActionProcessor: FinancialActionProcessor + financialActionPublisher: FinancialActionPublisher, ): OrderManager { return OrderManagerImpl( pairConfigLoader, @@ -58,7 +42,7 @@ class AppConfig { orderPersister, tempEventPersister, richOrderPublisher, - financialActionProcessor + financialActionPublisher ) } @@ -71,7 +55,7 @@ class AppConfig { richTradePublisher: RichTradePublisher, richOrderPublisher: RichOrderPublisher, feeCalculator: FeeCalculator, - financialActionProcessor: FinancialActionProcessor + financialActionPublisher: FinancialActionPublisher, ): TradeManager { return TradeManagerImpl( financeActionPersister, @@ -81,7 +65,7 @@ class AppConfig { richTradePublisher, richOrderPublisher, feeCalculator, - financialActionProcessor + financialActionPublisher ) } diff --git a/accountant/accountant-app/src/main/kotlin/co/nilin/opex/accountant/app/scheduler/FinancialActionsJob.kt b/accountant/accountant-app/src/main/kotlin/co/nilin/opex/accountant/app/scheduler/FinancialActionsJob.kt index 2e94d3310..08a643fb8 100644 --- a/accountant/accountant-app/src/main/kotlin/co/nilin/opex/accountant/app/scheduler/FinancialActionsJob.kt +++ b/accountant/accountant-app/src/main/kotlin/co/nilin/opex/accountant/app/scheduler/FinancialActionsJob.kt @@ -1,6 +1,5 @@ package co.nilin.opex.accountant.app.scheduler -import co.nilin.opex.accountant.core.api.FinancialActionProcessor import kotlinx.coroutines.* import org.slf4j.LoggerFactory import org.springframework.context.annotation.Profile @@ -9,12 +8,12 @@ import org.springframework.stereotype.Service @Service @Profile("scheduled") -class FinancialActionsJob(private val financialActionProcessor: FinancialActionProcessor) { +class FinancialActionsJob() { private val log = LoggerFactory.getLogger(FinancialActionsJob::class.java) private val scope = CoroutineScope(Dispatchers.IO) - @Scheduled(fixedDelay = 10000, initialDelay = 10000) + //@Scheduled(fixedDelay = 10000, initialDelay = 10000) fun processFinancialActions() { scope.ensureActive() if (!scope.isCompleted()) @@ -23,7 +22,7 @@ class FinancialActionsJob(private val financialActionProcessor: FinancialActionP scope.launch { try { //read unprocessed fa records and call transfer - financialActionProcessor.batchProcess(0, 100) + //financialActionProcessor.batchProcess(0, 100) } catch (e: Exception) { log.error("Financial action manager unable to batch process", e) } diff --git a/accountant/accountant-core/src/main/kotlin/co/nilin/opex/accountant/core/api/FinancialActionJobManager.kt b/accountant/accountant-core/src/main/kotlin/co/nilin/opex/accountant/core/api/FinancialActionJobManager.kt new file mode 100644 index 000000000..57458f849 --- /dev/null +++ b/accountant/accountant-core/src/main/kotlin/co/nilin/opex/accountant/core/api/FinancialActionJobManager.kt @@ -0,0 +1,5 @@ +package co.nilin.opex.accountant.core.api + +interface FinancialActionJobManager { + suspend fun processFinancialActions(offset: Long, size: Long) +} \ No newline at end of file diff --git a/accountant/accountant-core/src/main/kotlin/co/nilin/opex/accountant/core/api/FinancialActionProcessor.kt b/accountant/accountant-core/src/main/kotlin/co/nilin/opex/accountant/core/api/FinancialActionProcessor.kt deleted file mode 100644 index c9d22d532..000000000 --- a/accountant/accountant-core/src/main/kotlin/co/nilin/opex/accountant/core/api/FinancialActionProcessor.kt +++ /dev/null @@ -1,12 +0,0 @@ -package co.nilin.opex.accountant.core.api - -import co.nilin.opex.accountant.core.model.FinancialAction - -interface FinancialActionProcessor { - - suspend fun process(financialAction: FinancialAction) - - suspend fun process(financialActions: List) - - suspend fun batchProcess(offset: Long, size: Long) -} diff --git a/accountant/accountant-core/src/main/kotlin/co/nilin/opex/accountant/core/service/FinancialActionJobManagerImpl.kt b/accountant/accountant-core/src/main/kotlin/co/nilin/opex/accountant/core/service/FinancialActionJobManagerImpl.kt index 27db01828..c79004997 100644 --- a/accountant/accountant-core/src/main/kotlin/co/nilin/opex/accountant/core/service/FinancialActionJobManagerImpl.kt +++ b/accountant/accountant-core/src/main/kotlin/co/nilin/opex/accountant/core/service/FinancialActionJobManagerImpl.kt @@ -1,58 +1,50 @@ package co.nilin.opex.accountant.core.service +import co.nilin.opex.accountant.core.api.FinancialActionJobManager import co.nilin.opex.accountant.core.inout.TransferRequest import co.nilin.opex.accountant.core.model.FinancialAction import co.nilin.opex.accountant.core.model.FinancialActionStatus import co.nilin.opex.accountant.core.spi.FinancialActionLoader import co.nilin.opex.accountant.core.spi.FinancialActionPersister +import co.nilin.opex.accountant.core.spi.FinancialActionPublisher import co.nilin.opex.accountant.core.spi.WalletProxy import org.slf4j.LoggerFactory -@Deprecated("We are using kafka for processing now") class FinancialActionJobManagerImpl( private val financialActionLoader: FinancialActionLoader, private val financialActionPersister: FinancialActionPersister, - private val walletProxy: WalletProxy -) { + private val financialActionPublisher: FinancialActionPublisher, +) : FinancialActionJobManager { private val logger = LoggerFactory.getLogger(FinancialActionJobManagerImpl::class.java) - suspend fun processFinancialActions(offset: Long, size: Long) { + override suspend fun processFinancialActions(offset: Long, size: Long) { val factions = financialActionLoader.loadUnprocessed(offset, size) - val flatten = sortAndFlattenFA(factions) - logger.info("Loaded ${flatten.size} factions: ${flatten.map { it.id }}") - if (factions.isEmpty()) - return + publishFinancialActions(factions) + } - try { - val requests = factions.map { - TransferRequest( - it.amount, - it.symbol, - it.sender, - it.senderWalletType, - it.receiver, - it.receiverWalletType, - null, - it.eventType + it.pointer - ) + private suspend fun publishFinancialActions(financialActions: List) { + val list = arrayListOf() + financialActions.forEach { extractFAParents(it, list) } + for (fa in list) { + if (fa.status == FinancialActionStatus.CREATED) { + try { + financialActionPublisher.publish(fa) + } catch (e: Exception) { + logger.error("Cannot publish fa ${fa.uuid}", e) + break + } + financialActionPersister.updateStatus(fa, FinancialActionStatus.PROCESSED) } - walletProxy.batchTransfer(requests) - financialActionPersister.updateBatchStatus(factions, FinancialActionStatus.PROCESSED) - } catch (e: Exception) { - logger.error("financial job error", e) } } - fun sortAndFlattenFA(list: List): Collection { - val result = arrayListOf() - - fun extractParent(fa: FinancialAction) { - if (fa.parent != null) - extractParent(fa.parent) - result.add(fa) + private fun extractFAParents(financialAction: FinancialAction, list: ArrayList) { + if (financialAction.parent != null) { + extractFAParents(financialAction.parent, list) } - list.forEach { extractParent(it) } - return result.distinctBy { it.id } + + if (!list.contains(financialAction)) + list.add(financialAction) } } \ No newline at end of file diff --git a/accountant/accountant-core/src/main/kotlin/co/nilin/opex/accountant/core/service/FinancialActionProcessorImpl.kt b/accountant/accountant-core/src/main/kotlin/co/nilin/opex/accountant/core/service/FinancialActionProcessorImpl.kt deleted file mode 100644 index 65cba6092..000000000 --- a/accountant/accountant-core/src/main/kotlin/co/nilin/opex/accountant/core/service/FinancialActionProcessorImpl.kt +++ /dev/null @@ -1,59 +0,0 @@ -package co.nilin.opex.accountant.core.service - -import co.nilin.opex.accountant.core.api.FinancialActionProcessor -import co.nilin.opex.accountant.core.model.FinancialAction -import co.nilin.opex.accountant.core.model.FinancialActionStatus -import co.nilin.opex.accountant.core.spi.FinancialActionLoader -import co.nilin.opex.accountant.core.spi.FinancialActionPersister -import co.nilin.opex.accountant.core.spi.FinancialActionPublisher -import org.slf4j.LoggerFactory - -class FinancialActionProcessorImpl( - private val financialActionLoader: FinancialActionLoader, - private val financialActionPersister: FinancialActionPersister, - private val financialActionPublisher: FinancialActionPublisher -) : FinancialActionProcessor { - - private val logger = LoggerFactory.getLogger(FinancialActionProcessor::class.java) - - override suspend fun process(financialAction: FinancialAction) { - val list = ArrayList() - extractParents(financialAction, list) - process(list) - } - - override suspend fun process(financialActions: List) { - for (i in financialActions.indices) { - val fa = financialActions[i] - try { - if (fa.status == FinancialActionStatus.CREATED) - submitEvent(fa) - } catch (e: Exception) { - logger.error("Unable to submit financial action $fa", e) - break - } - financialActionPersister.updateStatus(fa.uuid, FinancialActionStatus.PROCESSED) - } - } - - override suspend fun batchProcess(offset: Long, size: Long) { - val factions = financialActionLoader.loadUnprocessed(offset, size) - val list = ArrayList() - factions.forEach { extractParents(it, list) } - process(list) - } - - private suspend fun submitEvent(financialAction: FinancialAction) { - financialActionPublisher.publish(financialAction) - } - - fun extractParents(financialAction: FinancialAction, list: ArrayList) { - if (financialAction.parent != null) { - extractParents(financialAction.parent, list) - } - - if (!list.contains(financialAction)) - list.add(financialAction) - } - -} \ No newline at end of file diff --git a/accountant/accountant-core/src/main/kotlin/co/nilin/opex/accountant/core/service/OrderManagerImpl.kt b/accountant/accountant-core/src/main/kotlin/co/nilin/opex/accountant/core/service/OrderManagerImpl.kt index 90e9f0709..26333c70c 100644 --- a/accountant/accountant-core/src/main/kotlin/co/nilin/opex/accountant/core/service/OrderManagerImpl.kt +++ b/accountant/accountant-core/src/main/kotlin/co/nilin/opex/accountant/core/service/OrderManagerImpl.kt @@ -1,11 +1,11 @@ package co.nilin.opex.accountant.core.service -import co.nilin.opex.accountant.core.api.FinancialActionProcessor import co.nilin.opex.accountant.core.api.OrderManager import co.nilin.opex.accountant.core.inout.OrderStatus import co.nilin.opex.accountant.core.inout.RichOrder import co.nilin.opex.accountant.core.inout.RichOrderUpdate import co.nilin.opex.accountant.core.model.FinancialAction +import co.nilin.opex.accountant.core.model.FinancialActionStatus import co.nilin.opex.accountant.core.model.Order import co.nilin.opex.accountant.core.spi.* import co.nilin.opex.matching.engine.core.eventh.events.* @@ -22,7 +22,7 @@ open class OrderManagerImpl( private val orderPersister: OrderPersister, private val tempEventPersister: TempEventPersister, private val richOrderPublisher: RichOrderPublisher, - private val financialActionProcessor: FinancialActionProcessor + private val financialActionPublisher: FinancialActionPublisher ) : OrderManager { @Transactional @@ -96,8 +96,9 @@ open class OrderManagerImpl( OrderStatus.REQUESTED.code ) ) - financialActionProcessor.process(financialAction) - return financialActionPersister.persist(listOf(financialAction)) + val fa = financialActionPersister.persist(listOf(financialAction)) + publishFinancialAction(financialAction) + return fa } @Transactional @@ -160,8 +161,9 @@ open class OrderManagerImpl( OrderStatus.REJECTED ) ) - financialActionProcessor.process(financialAction) - return financialActionPersister.persist(listOf(financialAction)) + val fa = financialActionPersister.persist(listOf(financialAction)) + publishFinancialAction(financialAction) + return fa } @Transactional @@ -205,8 +207,9 @@ open class OrderManagerImpl( OrderStatus.REJECTED ) ) - financialActionProcessor.process(financialAction) - return financialActionPersister.persist(listOf(financialAction)) + val fa = financialActionPersister.persist(listOf(financialAction)) + publishFinancialAction(financialAction) + return fa } private suspend fun publishRichOrder(order: Order, remainedQuantity: BigDecimal, status: OrderStatus? = null) { @@ -242,4 +245,14 @@ open class OrderManagerImpl( ) ) } + + private suspend fun publishFinancialAction(financialAction: FinancialAction) { + if (financialAction.parent != null) + publishFinancialAction(financialAction.parent) + + if (financialAction.status == FinancialActionStatus.CREATED) { + financialActionPublisher.publish(financialAction) + financialActionPersister.updateStatus(financialAction, FinancialActionStatus.PROCESSED) + } + } } \ No newline at end of file diff --git a/accountant/accountant-core/src/main/kotlin/co/nilin/opex/accountant/core/service/TradeManagerImpl.kt b/accountant/accountant-core/src/main/kotlin/co/nilin/opex/accountant/core/service/TradeManagerImpl.kt index 7b7451da9..2016fb037 100644 --- a/accountant/accountant-core/src/main/kotlin/co/nilin/opex/accountant/core/service/TradeManagerImpl.kt +++ b/accountant/accountant-core/src/main/kotlin/co/nilin/opex/accountant/core/service/TradeManagerImpl.kt @@ -1,12 +1,12 @@ package co.nilin.opex.accountant.core.service import co.nilin.opex.accountant.core.api.FeeCalculator -import co.nilin.opex.accountant.core.api.FinancialActionProcessor import co.nilin.opex.accountant.core.api.TradeManager import co.nilin.opex.accountant.core.inout.OrderStatus import co.nilin.opex.accountant.core.inout.RichOrderUpdate import co.nilin.opex.accountant.core.inout.RichTrade import co.nilin.opex.accountant.core.model.FinancialAction +import co.nilin.opex.accountant.core.model.FinancialActionStatus import co.nilin.opex.accountant.core.model.Order import co.nilin.opex.accountant.core.spi.* import co.nilin.opex.matching.engine.core.eventh.events.TradeEvent @@ -23,14 +23,14 @@ open class TradeManagerImpl( private val richTradePublisher: RichTradePublisher, private val richOrderPublisher: RichOrderPublisher, private val feeCalculator: FeeCalculator, - private val financialActionProcessor: FinancialActionProcessor + private val financialActionPublisher: FinancialActionPublisher ) : TradeManager { - private val log = LoggerFactory.getLogger(TradeManagerImpl::class.java) + private val logger = LoggerFactory.getLogger(TradeManagerImpl::class.java) @Transactional override suspend fun handleTrade(trade: TradeEvent): List { - log.info("trade event started {}", trade) + logger.info("trade event started {}", trade) val financialActions = mutableListOf() //taker order by ouid val takerOrder = orderPersister.load(trade.takerOuid) @@ -59,14 +59,14 @@ open class TradeManagerImpl( trade.matchedQuantity.toBigDecimal().multiply(makerOrder.leftSideFraction) .multiply(trade.makerPrice.toBigDecimal()).multiply(makerOrder.rightSideFraction) } - log.info("trade event configs loaded") + logger.info("trade event configs loaded") //lookup for taker parent fa val takerParentFinancialAction = financeActionLoader.findLast(trade.takerUuid, trade.takerOuid) - log.info("trade event takerParentFinancialAction {} ", takerParentFinancialAction) + logger.info("trade event takerParentFinancialAction {} ", takerParentFinancialAction) //lookup for maker parent fa val makerParentFinancialAction = financeActionLoader.findLast(trade.makerUuid, trade.makerOuid) - log.info("trade event makerParentFinancialAction {} ", makerParentFinancialAction) + logger.info("trade event makerParentFinancialAction {} ", makerParentFinancialAction) //create fa for transfer taker uuid symbol exchange wallet to maker symbol main wallet /* @@ -85,7 +85,7 @@ open class TradeManagerImpl( "main", LocalDateTime.now() ) - log.info("trade event takerTransferAction {}", takerTransferAction) + logger.info("trade event takerTransferAction {}", takerTransferAction) financialActions.add(takerTransferAction) //update taker order status @@ -94,7 +94,7 @@ open class TradeManagerImpl( takerOrder.status = 1 } orderPersister.save(takerOrder) - log.info("taker order saved {}", takerOrder) + logger.info("taker order saved {}", takerOrder) publishTakerRichOrderUpdate(takerOrder, trade) //create fa for transfer makerUuid symbol exchange wallet to taker symbol main wallet @@ -114,7 +114,7 @@ open class TradeManagerImpl( "main", LocalDateTime.now() ) - log.info("trade event makerTransferAction {}", makerTransferAction) + logger.info("trade event makerTransferAction {}", makerTransferAction) financialActions.add(makerTransferAction) //update maker order status @@ -123,7 +123,7 @@ open class TradeManagerImpl( makerOrder.status = 1 } orderPersister.save(makerOrder) - log.info("maker order saved {}", makerOrder) + logger.info("maker order saved {}", makerOrder) publishMakerRichOrderUpdate(makerOrder, trade) val feeActions = feeCalculator.createFeeActions( @@ -169,9 +169,7 @@ open class TradeManagerImpl( trade.eventDate ) ) - return financeActionPersister.persist(financialActions).also { - financialActionProcessor.process(financialActions) - } + return financeActionPersister.persist(financialActions).also { publishFinancialActions(it) } } private suspend fun publishTakerRichOrderUpdate(takerOrder: Order, trade: TradeEvent) { @@ -194,4 +192,29 @@ open class TradeManagerImpl( richOrderPublisher.publish(RichOrderUpdate(order.ouid, price, order.origQuantity, remainedQty, status)) } + + private suspend fun publishFinancialActions(financialActions: List) { + val list = arrayListOf() + financialActions.forEach { extractFAParents(it, list) } + for (fa in list) { + if (fa.status == FinancialActionStatus.CREATED) { + try { + financialActionPublisher.publish(fa) + } catch (e: Exception) { + logger.error("Cannot publish fa ${fa.uuid}", e) + break + } + financeActionPersister.updateStatus(fa, FinancialActionStatus.PROCESSED) + } + } + } + + private fun extractFAParents(financialAction: FinancialAction, list: ArrayList) { + if (financialAction.parent != null) { + extractFAParents(financialAction.parent, list) + } + + if (!list.contains(financialAction)) + list.add(financialAction) + } } \ No newline at end of file diff --git a/accountant/accountant-core/src/test/kotlin/co/nilin/opex/accountant/core/service/FAJobManagerImplTest.kt b/accountant/accountant-core/src/test/kotlin/co/nilin/opex/accountant/core/service/FAJobManagerImplTest.kt deleted file mode 100644 index d2ba203c6..000000000 --- a/accountant/accountant-core/src/test/kotlin/co/nilin/opex/accountant/core/service/FAJobManagerImplTest.kt +++ /dev/null @@ -1,58 +0,0 @@ -package co.nilin.opex.accountant.core.service - -import co.nilin.opex.accountant.core.model.FinancialAction -import co.nilin.opex.accountant.core.spi.FinancialActionLoader -import co.nilin.opex.accountant.core.spi.FinancialActionPersister -import co.nilin.opex.accountant.core.spi.WalletProxy -import io.mockk.coEvery -import io.mockk.coVerify -import io.mockk.mockk -import kotlinx.coroutines.runBlocking -import org.junit.jupiter.api.Test -import java.math.BigDecimal -import java.time.LocalDateTime -import org.assertj.core.api.Assertions.assertThat - -class FAJobManagerImplTest { - - private val financialActionLoader = mockk() - private val financialActionPersister = mockk() - private val walletProxy = mockk() - - private val sut = FinancialActionJobManagerImpl(financialActionLoader, financialActionPersister, walletProxy) - - init { - coEvery { financialActionLoader.loadUnprocessed(any(), any()) } returns listOf(Valid.fa, Valid.fa) - coEvery { financialActionPersister.updateBatchStatus(any(), any()) } returns Unit - } - - @Test - fun given2FALoaded_whenProcessingFailed_thenUpdateStatusCalledRegardless() = runBlocking { - coEvery { - walletProxy.batchTransfer(any()) - } throws IllegalStateException() - - sut.processFinancialActions(0, 2) - coVerify(exactly = 1) { - walletProxy.batchTransfer(any()) - } - } - - @Test - fun givenFALoaded_validateParentsAreFirstInLine(): Unit = runBlocking { - val fa1 = FinancialAction(null, "", "", "", BigDecimal.ZERO, "", "", "", "", LocalDateTime.now(), id = 1) - val fa2 = FinancialAction(fa1, "", "", "", BigDecimal.ZERO, "", "", "", "", LocalDateTime.now(), id = 2) - val fa3 = FinancialAction(fa1, "", "", "", BigDecimal.ZERO, "", "", "", "", LocalDateTime.now(), id = 3) - val fa4 = FinancialAction(fa3, "", "", "", BigDecimal.ZERO, "", "", "", "", LocalDateTime.now(), id = 4) - val fa5 = FinancialAction(null, "", "", "", BigDecimal.ZERO, "", "", "", "", LocalDateTime.now(), id = 5) - val list = arrayListOf(fa5, fa4, fa3, fa2, fa1) - - val flatten = sut.sortAndFlattenFA(list) - - assertThat(flatten.indexOf(fa1)).isLessThan(flatten.indexOf(fa2)) - assertThat(flatten.indexOf(fa1)).isLessThan(flatten.indexOf(fa3)) - assertThat(flatten.indexOf(fa1)).isLessThan(flatten.indexOf(fa4)) - assertThat(flatten.indexOf(fa3)).isLessThan(flatten.indexOf(fa4)) - } - -} \ No newline at end of file diff --git a/accountant/accountant-core/src/test/kotlin/co/nilin/opex/accountant/core/service/FAManagerImplTest.kt b/accountant/accountant-core/src/test/kotlin/co/nilin/opex/accountant/core/service/FAManagerImplTest.kt deleted file mode 100644 index cccad8abb..000000000 --- a/accountant/accountant-core/src/test/kotlin/co/nilin/opex/accountant/core/service/FAManagerImplTest.kt +++ /dev/null @@ -1,115 +0,0 @@ -package co.nilin.opex.accountant.core.service - -import co.nilin.opex.accountant.core.model.FinancialAction -import co.nilin.opex.accountant.core.model.FinancialActionStatus -import co.nilin.opex.accountant.core.spi.FinancialActionLoader -import co.nilin.opex.accountant.core.spi.FinancialActionPersister -import co.nilin.opex.accountant.core.spi.FinancialActionPublisher -import io.mockk.coEvery -import io.mockk.coVerify -import io.mockk.mockk -import kotlinx.coroutines.runBlocking -import org.junit.jupiter.api.Test -import java.time.LocalDateTime -import org.assertj.core.api.Assertions.assertThat - -class FAManagerImplTest { - - private val financialActionLoader = mockk() - private val financialActionPersister = mockk() - private val financialActionPublisher = mockk() - - private val sut = FinancialActionProcessorImpl( - financialActionLoader, - financialActionPersister, - financialActionPublisher - ) - - init { - coEvery { financialActionLoader.loadUnprocessed(any(), any()) } returns arrayListOf(fa2, fa1, fa5, fa3, fa4) - coEvery { financialActionPersister.persist(any()) } returns arrayListOf() - coEvery { financialActionPersister.persistWithStatus(any(), any()) } returns Unit - coEvery { financialActionPublisher.publish(any()) } returns Unit - } - - @Test - fun givenFALoaded_validateParentsAreFirstInLine(): Unit = runBlocking { - val list = arrayListOf() - sut.extractParents(fa5, list) - - assertThat(list.size).isEqualTo(4) - assertThat(list[0].id).isEqualTo(1) - assertThat(list[1].id).isEqualTo(2) - assertThat(list[2].id).isEqualTo(3) - assertThat(list[3].id).isEqualTo(5) - } - - companion object { - private val fa1 = FinancialAction( - null, - "", - "", - "BTCUSDT", - 1.0.toBigDecimal(), - "w1", - "main", - "w2", - "exchange", - LocalDateTime.now(), - id = 1 - ) - private val fa2 = FinancialAction( - fa1, - "", - "", - "BTCUSDT", - 2.0.toBigDecimal(), - "w2", - "exchange", - "w1", - "main", - LocalDateTime.now(), - id = 2 - ) - private val fa3 = FinancialAction( - fa2, - "", - "", - "BTCUSDT", - 1.5.toBigDecimal(), - "w2", - "main", - "w2", - "exchange", - LocalDateTime.now(), - id = 3 - ) - private val fa4 = FinancialAction( - fa2, - "", - "", - "BTCUSDT", - 1.0.toBigDecimal(), - "w1", - "main", - "w1", - "exchange", - LocalDateTime.now(), - id = 4 - ) - private val fa5 = FinancialAction( - fa3, - "", - "", - "BTCUSDT", - 1.1.toBigDecimal(), - "w1", - "main", - "w3", - "exchange", - LocalDateTime.now(), - id = 5 - ) - } - -} \ No newline at end of file diff --git a/accountant/accountant-core/src/test/kotlin/co/nilin/opex/accountant/core/service/OrderManagerImplTest.kt b/accountant/accountant-core/src/test/kotlin/co/nilin/opex/accountant/core/service/OrderManagerImplTest.kt index 35cfe70a2..021f6cba6 100644 --- a/accountant/accountant-core/src/test/kotlin/co/nilin/opex/accountant/core/service/OrderManagerImplTest.kt +++ b/accountant/accountant-core/src/test/kotlin/co/nilin/opex/accountant/core/service/OrderManagerImplTest.kt @@ -1,6 +1,5 @@ package co.nilin.opex.accountant.core.service -import co.nilin.opex.accountant.core.api.FinancialActionProcessor import co.nilin.opex.accountant.core.inout.OrderStatus import co.nilin.opex.accountant.core.model.FinancialAction import co.nilin.opex.accountant.core.model.PairConfig @@ -34,7 +33,7 @@ internal class OrderManagerImplTest { private val pairConfigLoader = mockk() private val richOrderPublisher = mockk() private val userLevelLoader = mockk() - private val financialActionProcessor = mockk() + private val financialActionPublisher = mockk() private val orderManager = OrderManagerImpl( pairConfigLoader, @@ -44,7 +43,7 @@ internal class OrderManagerImplTest { orderPersister, tempEventPersister, richOrderPublisher, - financialActionProcessor + financialActionPublisher ) init { @@ -54,8 +53,9 @@ internal class OrderManagerImplTest { coEvery { tempEventPersister.saveTempEvent(any(), any()) } returns any() coEvery { financialActionLoader.findLast(any(), any()) } returns null coEvery { financialActionPersister.persist(any()) } returnsArgument (0) + coEvery { financialActionPersister.updateStatus(any(), any()) } returns Unit coEvery { userLevelLoader.load(any()) } returns "*" - coEvery { financialActionProcessor.process(any()) } returns Unit + coEvery { financialActionPublisher.publish(any()) } returns Unit } @Test diff --git a/accountant/accountant-core/src/test/kotlin/co/nilin/opex/accountant/core/service/TradeManagerImplTest.kt b/accountant/accountant-core/src/test/kotlin/co/nilin/opex/accountant/core/service/TradeManagerImplTest.kt index 1fbca03d4..9cdd02eae 100644 --- a/accountant/accountant-core/src/test/kotlin/co/nilin/opex/accountant/core/service/TradeManagerImplTest.kt +++ b/accountant/accountant-core/src/test/kotlin/co/nilin/opex/accountant/core/service/TradeManagerImplTest.kt @@ -1,6 +1,5 @@ package co.nilin.opex.accountant.core.service -import co.nilin.opex.accountant.core.api.FinancialActionProcessor import co.nilin.opex.accountant.core.model.FinancialAction import co.nilin.opex.accountant.core.model.Order import co.nilin.opex.accountant.core.model.PairConfig @@ -29,7 +28,7 @@ internal class TradeManagerImplTest { private val richOrderPublisher = mockk() private val richTradePublisher = mockk() private val userLevelLoader = mockk() - private val financialActionProcessor = mockk() + private val financialActionPublisher = mockk() private val orderManager = OrderManagerImpl( pairConfigLoader, @@ -39,7 +38,7 @@ internal class TradeManagerImplTest { orderPersister, tempEventPersister, richOrderPublisher, - financialActionProcessor + financialActionPublisher ) private val tradeManager = TradeManagerImpl( @@ -50,7 +49,7 @@ internal class TradeManagerImplTest { richTradePublisher, richOrderPublisher, FeeCalculatorImpl("0x0"), - financialActionProcessor + financialActionPublisher ) init { @@ -60,8 +59,8 @@ internal class TradeManagerImplTest { coEvery { richOrderPublisher.publish(any()) } returns Unit coEvery { richTradePublisher.publish(any()) } returns Unit coEvery { userLevelLoader.load(any()) } returns "*" - coEvery { financialActionProcessor.process(any()) } returns Unit - coEvery { financialActionProcessor.process(any>()) } returns Unit + coEvery { financialActionPublisher.publish(any()) } returns Unit + coEvery { financialActionPersister.updateStatus(any(), any()) } returns Unit } @Test From 8cfceabb09c2bf96f2d0c2ab97b660c47c3375c8 Mon Sep 17 00:00:00 2001 From: Peyman Date: Sat, 11 Mar 2023 19:45:44 +0330 Subject: [PATCH 5/6] Fix update status error --- .../co/nilin/opex/accountant/core/service/OrderManagerImpl.kt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/accountant/accountant-core/src/main/kotlin/co/nilin/opex/accountant/core/service/OrderManagerImpl.kt b/accountant/accountant-core/src/main/kotlin/co/nilin/opex/accountant/core/service/OrderManagerImpl.kt index 26333c70c..ed2571adc 100644 --- a/accountant/accountant-core/src/main/kotlin/co/nilin/opex/accountant/core/service/OrderManagerImpl.kt +++ b/accountant/accountant-core/src/main/kotlin/co/nilin/opex/accountant/core/service/OrderManagerImpl.kt @@ -252,7 +252,7 @@ open class OrderManagerImpl( if (financialAction.status == FinancialActionStatus.CREATED) { financialActionPublisher.publish(financialAction) - financialActionPersister.updateStatus(financialAction, FinancialActionStatus.PROCESSED) + financialActionPersister.updateStatus(financialAction.uuid, FinancialActionStatus.PROCESSED) } } } \ No newline at end of file From b3b39c2e8d7d15431529e45ba0a83470680404fe Mon Sep 17 00:00:00 2001 From: Peyman Date: Sat, 11 Mar 2023 19:49:54 +0330 Subject: [PATCH 6/6] Fix test error error --- .../nilin/opex/accountant/core/service/OrderManagerImplTest.kt | 1 + .../nilin/opex/accountant/core/service/TradeManagerImplTest.kt | 1 + 2 files changed, 2 insertions(+) diff --git a/accountant/accountant-core/src/test/kotlin/co/nilin/opex/accountant/core/service/OrderManagerImplTest.kt b/accountant/accountant-core/src/test/kotlin/co/nilin/opex/accountant/core/service/OrderManagerImplTest.kt index 021f6cba6..9406ff297 100644 --- a/accountant/accountant-core/src/test/kotlin/co/nilin/opex/accountant/core/service/OrderManagerImplTest.kt +++ b/accountant/accountant-core/src/test/kotlin/co/nilin/opex/accountant/core/service/OrderManagerImplTest.kt @@ -54,6 +54,7 @@ internal class OrderManagerImplTest { coEvery { financialActionLoader.findLast(any(), any()) } returns null coEvery { financialActionPersister.persist(any()) } returnsArgument (0) coEvery { financialActionPersister.updateStatus(any(), any()) } returns Unit + coEvery { financialActionPersister.updateStatus(any(), any()) } returns Unit coEvery { userLevelLoader.load(any()) } returns "*" coEvery { financialActionPublisher.publish(any()) } returns Unit } diff --git a/accountant/accountant-core/src/test/kotlin/co/nilin/opex/accountant/core/service/TradeManagerImplTest.kt b/accountant/accountant-core/src/test/kotlin/co/nilin/opex/accountant/core/service/TradeManagerImplTest.kt index 9cdd02eae..0488eab12 100644 --- a/accountant/accountant-core/src/test/kotlin/co/nilin/opex/accountant/core/service/TradeManagerImplTest.kt +++ b/accountant/accountant-core/src/test/kotlin/co/nilin/opex/accountant/core/service/TradeManagerImplTest.kt @@ -61,6 +61,7 @@ internal class TradeManagerImplTest { coEvery { userLevelLoader.load(any()) } returns "*" coEvery { financialActionPublisher.publish(any()) } returns Unit coEvery { financialActionPersister.updateStatus(any(), any()) } returns Unit + coEvery { financialActionPersister.updateStatus(any(), any()) } returns Unit } @Test