diff --git a/accountant/accountant-app/src/main/kotlin/co/nilin/opex/accountant/app/config/AppConfig.kt b/accountant/accountant-app/src/main/kotlin/co/nilin/opex/accountant/app/config/AppConfig.kt index 85d3a389a..30080a8c9 100644 --- a/accountant/accountant-app/src/main/kotlin/co/nilin/opex/accountant/app/config/AppConfig.kt +++ b/accountant/accountant-app/src/main/kotlin/co/nilin/opex/accountant/app/config/AppConfig.kt @@ -4,10 +4,7 @@ import co.nilin.opex.accountant.app.listener.AccountantEventListener import co.nilin.opex.accountant.app.listener.AccountantTempEventListener import co.nilin.opex.accountant.app.listener.AccountantTradeListener import co.nilin.opex.accountant.app.listener.OrderListener -import co.nilin.opex.accountant.core.api.FeeCalculator -import co.nilin.opex.accountant.core.api.FinancialActionJobManager -import co.nilin.opex.accountant.core.api.OrderManager -import co.nilin.opex.accountant.core.api.TradeManager +import co.nilin.opex.accountant.core.api.* import co.nilin.opex.accountant.core.service.FinancialActionJobManagerImpl import co.nilin.opex.accountant.core.service.OrderManagerImpl import co.nilin.opex.accountant.core.service.TradeManagerImpl @@ -25,19 +22,6 @@ import org.springframework.scheduling.annotation.EnableScheduling @EnableScheduling class AppConfig { - @Bean - fun getFinancialActionJobManager( - financialActionLoader: FinancialActionLoader, - financialActionPersister: FinancialActionPersister, - walletProxy: WalletProxy - ): FinancialActionJobManager { - return FinancialActionJobManagerImpl( - financialActionLoader, - financialActionPersister, - walletProxy - ) - } - @Bean fun orderManager( pairConfigLoader: PairConfigLoader, @@ -48,6 +32,7 @@ class AppConfig { tempEventPersister: TempEventPersister, tempEventRepublisher: TempEventRepublisher, richOrderPublisher: RichOrderPublisher, + financialActionPublisher: FinancialActionPublisher, ): OrderManager { return OrderManagerImpl( pairConfigLoader, @@ -56,7 +41,8 @@ class AppConfig { financeActionLoader, orderPersister, tempEventPersister, - richOrderPublisher + richOrderPublisher, + financialActionPublisher ) } @@ -69,6 +55,7 @@ class AppConfig { richTradePublisher: RichTradePublisher, richOrderPublisher: RichOrderPublisher, feeCalculator: FeeCalculator, + financialActionPublisher: FinancialActionPublisher, ): TradeManager { return TradeManagerImpl( financeActionPersister, @@ -77,7 +64,8 @@ class AppConfig { tempEventPersister, richTradePublisher, richOrderPublisher, - feeCalculator + feeCalculator, + financialActionPublisher ) } diff --git a/accountant/accountant-app/src/main/kotlin/co/nilin/opex/accountant/app/listener/AccountantTempEventListener.kt b/accountant/accountant-app/src/main/kotlin/co/nilin/opex/accountant/app/listener/AccountantTempEventListener.kt index 21feb6b90..1d559801f 100644 --- a/accountant/accountant-app/src/main/kotlin/co/nilin/opex/accountant/app/listener/AccountantTempEventListener.kt +++ b/accountant/accountant-app/src/main/kotlin/co/nilin/opex/accountant/app/listener/AccountantTempEventListener.kt @@ -5,18 +5,21 @@ import co.nilin.opex.accountant.core.api.TradeManager import co.nilin.opex.accountant.ports.kafka.listener.spi.TempEventListener import co.nilin.opex.matching.engine.core.eventh.events.* import kotlinx.coroutines.runBlocking +import org.slf4j.LoggerFactory class AccountantTempEventListener( private val orderManager: OrderManager, private val tradeManager: TradeManager ) : TempEventListener { + private val logger = LoggerFactory.getLogger(AccountantTempEventListener::class.java) + override fun id(): String { return "TempEventListener" } override fun onEvent(event: CoreEvent, partition: Int, offset: Long, timestamp: Long) { - println("TempEvent $event") + logger.info("TempEvent received $event") runBlocking { when (event) { is CreateOrderEvent -> orderManager.handleNewOrder(event) @@ -29,6 +32,6 @@ class AccountantTempEventListener( } } } - println("onEvent") + logger.info("TempEvent processed") } } \ No newline at end of file diff --git a/accountant/accountant-app/src/main/kotlin/co/nilin/opex/accountant/app/listener/AccountantTradeListener.kt b/accountant/accountant-app/src/main/kotlin/co/nilin/opex/accountant/app/listener/AccountantTradeListener.kt index 024f9034a..650db2f64 100644 --- a/accountant/accountant-app/src/main/kotlin/co/nilin/opex/accountant/app/listener/AccountantTradeListener.kt +++ b/accountant/accountant-app/src/main/kotlin/co/nilin/opex/accountant/app/listener/AccountantTradeListener.kt @@ -1,7 +1,6 @@ package co.nilin.opex.accountant.app.listener import co.nilin.opex.accountant.core.api.TradeManager -import co.nilin.opex.accountant.ports.kafka.listener.spi.Listener import co.nilin.opex.accountant.ports.kafka.listener.spi.TradeListener import co.nilin.opex.matching.engine.core.eventh.events.TradeEvent import kotlinx.coroutines.runBlocking diff --git a/accountant/accountant-app/src/main/kotlin/co/nilin/opex/accountant/app/scheduler/FinancialActionsJob.kt b/accountant/accountant-app/src/main/kotlin/co/nilin/opex/accountant/app/scheduler/FinancialActionsJob.kt index b8230e70d..08a643fb8 100644 --- a/accountant/accountant-app/src/main/kotlin/co/nilin/opex/accountant/app/scheduler/FinancialActionsJob.kt +++ b/accountant/accountant-app/src/main/kotlin/co/nilin/opex/accountant/app/scheduler/FinancialActionsJob.kt @@ -1,8 +1,6 @@ package co.nilin.opex.accountant.app.scheduler -import co.nilin.opex.accountant.core.api.FinancialActionJobManager -import kotlinx.coroutines.Dispatchers -import kotlinx.coroutines.runBlocking +import kotlinx.coroutines.* import org.slf4j.LoggerFactory import org.springframework.context.annotation.Profile import org.springframework.scheduling.annotation.Scheduled @@ -10,20 +8,27 @@ import org.springframework.stereotype.Service @Service @Profile("scheduled") -class FinancialActionsJob(val financialActionJobManager: FinancialActionJobManager) { +class FinancialActionsJob() { private val log = LoggerFactory.getLogger(FinancialActionsJob::class.java) + private val scope = CoroutineScope(Dispatchers.IO) - @Scheduled(fixedDelay = 10000) + //@Scheduled(fixedDelay = 10000, initialDelay = 10000) fun processFinancialActions() { - runBlocking(Dispatchers.IO) { + scope.ensureActive() + if (!scope.isCompleted()) + return + + scope.launch { try { //read unprocessed fa records and call transfer - financialActionJobManager.processFinancialActions(0, 100) + //financialActionProcessor.batchProcess(0, 100) } catch (e: Exception) { - log.error("Job error!", e) + log.error("Financial action manager unable to batch process", e) } } } + private fun CoroutineScope.isCompleted() = coroutineContext.job.children.all { it.isCompleted } + } \ No newline at end of file diff --git a/accountant/accountant-core/src/main/kotlin/co/nilin/opex/accountant/core/api/FinancialActionJobManager.kt b/accountant/accountant-core/src/main/kotlin/co/nilin/opex/accountant/core/api/FinancialActionJobManager.kt index bb9c26062..57458f849 100644 --- a/accountant/accountant-core/src/main/kotlin/co/nilin/opex/accountant/core/api/FinancialActionJobManager.kt +++ b/accountant/accountant-core/src/main/kotlin/co/nilin/opex/accountant/core/api/FinancialActionJobManager.kt @@ -1,5 +1,5 @@ -package co.nilin.opex.accountant.core.api - -interface FinancialActionJobManager { - suspend fun processFinancialActions(offset: Long, size: Long) -} +package co.nilin.opex.accountant.core.api + +interface FinancialActionJobManager { + suspend fun processFinancialActions(offset: Long, size: Long) +} \ No newline at end of file diff --git a/accountant/accountant-core/src/main/kotlin/co/nilin/opex/accountant/core/inout/FinancialActionEvent.kt b/accountant/accountant-core/src/main/kotlin/co/nilin/opex/accountant/core/inout/FinancialActionEvent.kt new file mode 100644 index 000000000..83a33d1ba --- /dev/null +++ b/accountant/accountant-core/src/main/kotlin/co/nilin/opex/accountant/core/inout/FinancialActionEvent.kt @@ -0,0 +1,17 @@ +package co.nilin.opex.accountant.core.inout + +import java.math.BigDecimal +import java.time.LocalDateTime + +data class FinancialActionEvent( + val uuid: String, + val symbol: String, + val amount: BigDecimal, + val sender: String, + val senderWalletType: String, + val receiver: String, + val receiverWalletType: String, + val createDate: LocalDateTime, + val transferRef: String?, + val description: String +) \ No newline at end of file diff --git a/accountant/accountant-core/src/main/kotlin/co/nilin/opex/accountant/core/model/FinancialAction.kt b/accountant/accountant-core/src/main/kotlin/co/nilin/opex/accountant/core/model/FinancialAction.kt index 96ff9a2c6..7b4589e20 100644 --- a/accountant/accountant-core/src/main/kotlin/co/nilin/opex/accountant/core/model/FinancialAction.kt +++ b/accountant/accountant-core/src/main/kotlin/co/nilin/opex/accountant/core/model/FinancialAction.kt @@ -2,6 +2,7 @@ package co.nilin.opex.accountant.core.model import java.math.BigDecimal import java.time.LocalDateTime +import java.util.UUID class FinancialAction( val parent: FinancialAction?, @@ -14,10 +15,19 @@ class FinancialAction( val receiver: String, val receiverWalletType: String, val createDate: LocalDateTime, - val retryCount: Int = 0, + val status: FinancialActionStatus = FinancialActionStatus.CREATED, + val uuid: String = UUID.randomUUID().toString(), val id: Long? = null ) { + override fun equals(other: Any?): Boolean { + if (other == null || other !is FinancialAction) return false + return if (id != null && other.id != null) + id == other.id + else + uuid == other.uuid + } + override fun toString(): String { return "FinancialAction(id=$id, parent=$parent, eventType='$eventType', pointer='$pointer', symbol='$symbol', amount=$amount, sender='$sender', senderWalletType='$senderWalletType', receiver='$receiver', receiverWalletType='$receiverWalletType', createDate=$createDate)" } diff --git a/accountant/accountant-core/src/main/kotlin/co/nilin/opex/accountant/core/service/FinancialActionJobManagerImpl.kt b/accountant/accountant-core/src/main/kotlin/co/nilin/opex/accountant/core/service/FinancialActionJobManagerImpl.kt index 944bb4a22..c79004997 100644 --- a/accountant/accountant-core/src/main/kotlin/co/nilin/opex/accountant/core/service/FinancialActionJobManagerImpl.kt +++ b/accountant/accountant-core/src/main/kotlin/co/nilin/opex/accountant/core/service/FinancialActionJobManagerImpl.kt @@ -6,53 +6,45 @@ import co.nilin.opex.accountant.core.model.FinancialAction import co.nilin.opex.accountant.core.model.FinancialActionStatus import co.nilin.opex.accountant.core.spi.FinancialActionLoader import co.nilin.opex.accountant.core.spi.FinancialActionPersister +import co.nilin.opex.accountant.core.spi.FinancialActionPublisher import co.nilin.opex.accountant.core.spi.WalletProxy import org.slf4j.LoggerFactory class FinancialActionJobManagerImpl( private val financialActionLoader: FinancialActionLoader, private val financialActionPersister: FinancialActionPersister, - private val walletProxy: WalletProxy + private val financialActionPublisher: FinancialActionPublisher, ) : FinancialActionJobManager { private val logger = LoggerFactory.getLogger(FinancialActionJobManagerImpl::class.java) override suspend fun processFinancialActions(offset: Long, size: Long) { val factions = financialActionLoader.loadUnprocessed(offset, size) - val flatten = sortAndFlattenFA(factions) - logger.info("Loaded ${flatten.size} factions: ${flatten.map { it.id }}") - if (factions.isEmpty()) - return + publishFinancialActions(factions) + } - try { - val requests = factions.map { - TransferRequest( - it.amount, - it.symbol, - it.sender, - it.senderWalletType, - it.receiver, - it.receiverWalletType, - null, - it.eventType + it.pointer - ) + private suspend fun publishFinancialActions(financialActions: List) { + val list = arrayListOf() + financialActions.forEach { extractFAParents(it, list) } + for (fa in list) { + if (fa.status == FinancialActionStatus.CREATED) { + try { + financialActionPublisher.publish(fa) + } catch (e: Exception) { + logger.error("Cannot publish fa ${fa.uuid}", e) + break + } + financialActionPersister.updateStatus(fa, FinancialActionStatus.PROCESSED) } - walletProxy.batchTransfer(requests) - financialActionPersister.updateBatchStatus(factions, FinancialActionStatus.PROCESSED) - } catch (e: Exception) { - logger.error("financial job error", e) } } - fun sortAndFlattenFA(list: List): Collection { - val result = arrayListOf() - - fun extractParent(fa: FinancialAction) { - if (fa.parent != null) - extractParent(fa.parent) - result.add(fa) + private fun extractFAParents(financialAction: FinancialAction, list: ArrayList) { + if (financialAction.parent != null) { + extractFAParents(financialAction.parent, list) } - list.forEach { extractParent(it) } - return result.distinctBy { it.id } + + if (!list.contains(financialAction)) + list.add(financialAction) } } \ No newline at end of file diff --git a/accountant/accountant-core/src/main/kotlin/co/nilin/opex/accountant/core/service/OrderManagerImpl.kt b/accountant/accountant-core/src/main/kotlin/co/nilin/opex/accountant/core/service/OrderManagerImpl.kt index 9c840bf73..ed2571adc 100644 --- a/accountant/accountant-core/src/main/kotlin/co/nilin/opex/accountant/core/service/OrderManagerImpl.kt +++ b/accountant/accountant-core/src/main/kotlin/co/nilin/opex/accountant/core/service/OrderManagerImpl.kt @@ -5,6 +5,7 @@ import co.nilin.opex.accountant.core.inout.OrderStatus import co.nilin.opex.accountant.core.inout.RichOrder import co.nilin.opex.accountant.core.inout.RichOrderUpdate import co.nilin.opex.accountant.core.model.FinancialAction +import co.nilin.opex.accountant.core.model.FinancialActionStatus import co.nilin.opex.accountant.core.model.Order import co.nilin.opex.accountant.core.spi.* import co.nilin.opex.matching.engine.core.eventh.events.* @@ -20,7 +21,8 @@ open class OrderManagerImpl( private val financeActionLoader: FinancialActionLoader, private val orderPersister: OrderPersister, private val tempEventPersister: TempEventPersister, - private val richOrderPublisher: RichOrderPublisher + private val richOrderPublisher: RichOrderPublisher, + private val financialActionPublisher: FinancialActionPublisher ) : OrderManager { @Transactional @@ -94,7 +96,9 @@ open class OrderManagerImpl( OrderStatus.REQUESTED.code ) ) - return financialActionPersister.persist(listOf(financialAction)) + val fa = financialActionPersister.persist(listOf(financialAction)) + publishFinancialAction(financialAction) + return fa } @Transactional @@ -157,7 +161,9 @@ open class OrderManagerImpl( OrderStatus.REJECTED ) ) - return financialActionPersister.persist(listOf(financialAction)) + val fa = financialActionPersister.persist(listOf(financialAction)) + publishFinancialAction(financialAction) + return fa } @Transactional @@ -201,7 +207,9 @@ open class OrderManagerImpl( OrderStatus.REJECTED ) ) - return financialActionPersister.persist(listOf(financialAction)) + val fa = financialActionPersister.persist(listOf(financialAction)) + publishFinancialAction(financialAction) + return fa } private suspend fun publishRichOrder(order: Order, remainedQuantity: BigDecimal, status: OrderStatus? = null) { @@ -237,4 +245,14 @@ open class OrderManagerImpl( ) ) } + + private suspend fun publishFinancialAction(financialAction: FinancialAction) { + if (financialAction.parent != null) + publishFinancialAction(financialAction.parent) + + if (financialAction.status == FinancialActionStatus.CREATED) { + financialActionPublisher.publish(financialAction) + financialActionPersister.updateStatus(financialAction.uuid, FinancialActionStatus.PROCESSED) + } + } } \ No newline at end of file diff --git a/accountant/accountant-core/src/main/kotlin/co/nilin/opex/accountant/core/service/TradeManagerImpl.kt b/accountant/accountant-core/src/main/kotlin/co/nilin/opex/accountant/core/service/TradeManagerImpl.kt index 5c4bec9f9..2016fb037 100644 --- a/accountant/accountant-core/src/main/kotlin/co/nilin/opex/accountant/core/service/TradeManagerImpl.kt +++ b/accountant/accountant-core/src/main/kotlin/co/nilin/opex/accountant/core/service/TradeManagerImpl.kt @@ -6,10 +6,10 @@ import co.nilin.opex.accountant.core.inout.OrderStatus import co.nilin.opex.accountant.core.inout.RichOrderUpdate import co.nilin.opex.accountant.core.inout.RichTrade import co.nilin.opex.accountant.core.model.FinancialAction +import co.nilin.opex.accountant.core.model.FinancialActionStatus import co.nilin.opex.accountant.core.model.Order import co.nilin.opex.accountant.core.spi.* import co.nilin.opex.matching.engine.core.eventh.events.TradeEvent -import co.nilin.opex.matching.engine.core.model.OrderDirection import org.slf4j.LoggerFactory import org.springframework.transaction.annotation.Transactional import java.math.BigDecimal @@ -22,14 +22,15 @@ open class TradeManagerImpl( private val tempEventPersister: TempEventPersister, private val richTradePublisher: RichTradePublisher, private val richOrderPublisher: RichOrderPublisher, - private val feeCalculator: FeeCalculator + private val feeCalculator: FeeCalculator, + private val financialActionPublisher: FinancialActionPublisher ) : TradeManager { - private val log = LoggerFactory.getLogger(TradeManagerImpl::class.java) + private val logger = LoggerFactory.getLogger(TradeManagerImpl::class.java) @Transactional override suspend fun handleTrade(trade: TradeEvent): List { - log.info("trade event started {}", trade) + logger.info("trade event started {}", trade) val financialActions = mutableListOf() //taker order by ouid val takerOrder = orderPersister.load(trade.takerOuid) @@ -58,14 +59,14 @@ open class TradeManagerImpl( trade.matchedQuantity.toBigDecimal().multiply(makerOrder.leftSideFraction) .multiply(trade.makerPrice.toBigDecimal()).multiply(makerOrder.rightSideFraction) } - log.info("trade event configs loaded") + logger.info("trade event configs loaded") //lookup for taker parent fa val takerParentFinancialAction = financeActionLoader.findLast(trade.takerUuid, trade.takerOuid) - log.info("trade event takerParentFinancialAction {} ", takerParentFinancialAction) + logger.info("trade event takerParentFinancialAction {} ", takerParentFinancialAction) //lookup for maker parent fa val makerParentFinancialAction = financeActionLoader.findLast(trade.makerUuid, trade.makerOuid) - log.info("trade event makerParentFinancialAction {} ", makerParentFinancialAction) + logger.info("trade event makerParentFinancialAction {} ", makerParentFinancialAction) //create fa for transfer taker uuid symbol exchange wallet to maker symbol main wallet /* @@ -84,7 +85,7 @@ open class TradeManagerImpl( "main", LocalDateTime.now() ) - log.info("trade event takerTransferAction {}", takerTransferAction) + logger.info("trade event takerTransferAction {}", takerTransferAction) financialActions.add(takerTransferAction) //update taker order status @@ -93,7 +94,7 @@ open class TradeManagerImpl( takerOrder.status = 1 } orderPersister.save(takerOrder) - log.info("taker order saved {}", takerOrder) + logger.info("taker order saved {}", takerOrder) publishTakerRichOrderUpdate(takerOrder, trade) //create fa for transfer makerUuid symbol exchange wallet to taker symbol main wallet @@ -113,7 +114,7 @@ open class TradeManagerImpl( "main", LocalDateTime.now() ) - log.info("trade event makerTransferAction {}", makerTransferAction) + logger.info("trade event makerTransferAction {}", makerTransferAction) financialActions.add(makerTransferAction) //update maker order status @@ -122,7 +123,7 @@ open class TradeManagerImpl( makerOrder.status = 1 } orderPersister.save(makerOrder) - log.info("maker order saved {}", makerOrder) + logger.info("maker order saved {}", makerOrder) publishMakerRichOrderUpdate(makerOrder, trade) val feeActions = feeCalculator.createFeeActions( @@ -168,7 +169,7 @@ open class TradeManagerImpl( trade.eventDate ) ) - return financeActionPersister.persist(financialActions) + return financeActionPersister.persist(financialActions).also { publishFinancialActions(it) } } private suspend fun publishTakerRichOrderUpdate(takerOrder: Order, trade: TradeEvent) { @@ -191,4 +192,29 @@ open class TradeManagerImpl( richOrderPublisher.publish(RichOrderUpdate(order.ouid, price, order.origQuantity, remainedQty, status)) } + + private suspend fun publishFinancialActions(financialActions: List) { + val list = arrayListOf() + financialActions.forEach { extractFAParents(it, list) } + for (fa in list) { + if (fa.status == FinancialActionStatus.CREATED) { + try { + financialActionPublisher.publish(fa) + } catch (e: Exception) { + logger.error("Cannot publish fa ${fa.uuid}", e) + break + } + financeActionPersister.updateStatus(fa, FinancialActionStatus.PROCESSED) + } + } + } + + private fun extractFAParents(financialAction: FinancialAction, list: ArrayList) { + if (financialAction.parent != null) { + extractFAParents(financialAction.parent, list) + } + + if (!list.contains(financialAction)) + list.add(financialAction) + } } \ No newline at end of file diff --git a/accountant/accountant-core/src/main/kotlin/co/nilin/opex/accountant/core/spi/FinancialActionLoader.kt b/accountant/accountant-core/src/main/kotlin/co/nilin/opex/accountant/core/spi/FinancialActionLoader.kt index e10e32b5c..72fe8cd77 100644 --- a/accountant/accountant-core/src/main/kotlin/co/nilin/opex/accountant/core/spi/FinancialActionLoader.kt +++ b/accountant/accountant-core/src/main/kotlin/co/nilin/opex/accountant/core/spi/FinancialActionLoader.kt @@ -3,7 +3,7 @@ package co.nilin.opex.accountant.core.spi import co.nilin.opex.accountant.core.model.FinancialAction interface FinancialActionLoader { - suspend fun findLast(uuid: String, ouid: String): FinancialAction? + suspend fun findLast(userUuid: String, ouid: String): FinancialAction? suspend fun loadUnprocessed(offset: Long, size: Long): List - suspend fun countUnprocessed(uuid: String, symbol: String, eventType: String): Long + suspend fun countUnprocessed(userUuid: String, symbol: String, eventType: String): Long } \ No newline at end of file diff --git a/accountant/accountant-core/src/main/kotlin/co/nilin/opex/accountant/core/spi/FinancialActionPersister.kt b/accountant/accountant-core/src/main/kotlin/co/nilin/opex/accountant/core/spi/FinancialActionPersister.kt index 6b66dc4c3..8da7fbfef 100644 --- a/accountant/accountant-core/src/main/kotlin/co/nilin/opex/accountant/core/spi/FinancialActionPersister.kt +++ b/accountant/accountant-core/src/main/kotlin/co/nilin/opex/accountant/core/spi/FinancialActionPersister.kt @@ -7,7 +7,11 @@ interface FinancialActionPersister { suspend fun persist(financialActions: List): List + suspend fun persistWithStatus(financialAction: FinancialAction, status: FinancialActionStatus) + suspend fun updateStatus(financialAction: FinancialAction, status: FinancialActionStatus) + suspend fun updateStatus(faUuid: String, status: FinancialActionStatus) + suspend fun updateBatchStatus(financialAction: List, status: FinancialActionStatus) } \ No newline at end of file diff --git a/accountant/accountant-core/src/main/kotlin/co/nilin/opex/accountant/core/spi/FinancialActionPublisher.kt b/accountant/accountant-core/src/main/kotlin/co/nilin/opex/accountant/core/spi/FinancialActionPublisher.kt new file mode 100644 index 000000000..bc36bc2ab --- /dev/null +++ b/accountant/accountant-core/src/main/kotlin/co/nilin/opex/accountant/core/spi/FinancialActionPublisher.kt @@ -0,0 +1,7 @@ +package co.nilin.opex.accountant.core.spi + +import co.nilin.opex.accountant.core.model.FinancialAction + +interface FinancialActionPublisher { + suspend fun publish(fa: FinancialAction) +} \ No newline at end of file diff --git a/accountant/accountant-core/src/test/kotlin/co/nilin/opex/accountant/core/service/FAJobManagerImplTest.kt b/accountant/accountant-core/src/test/kotlin/co/nilin/opex/accountant/core/service/FAJobManagerImplTest.kt deleted file mode 100644 index d2ba203c6..000000000 --- a/accountant/accountant-core/src/test/kotlin/co/nilin/opex/accountant/core/service/FAJobManagerImplTest.kt +++ /dev/null @@ -1,58 +0,0 @@ -package co.nilin.opex.accountant.core.service - -import co.nilin.opex.accountant.core.model.FinancialAction -import co.nilin.opex.accountant.core.spi.FinancialActionLoader -import co.nilin.opex.accountant.core.spi.FinancialActionPersister -import co.nilin.opex.accountant.core.spi.WalletProxy -import io.mockk.coEvery -import io.mockk.coVerify -import io.mockk.mockk -import kotlinx.coroutines.runBlocking -import org.junit.jupiter.api.Test -import java.math.BigDecimal -import java.time.LocalDateTime -import org.assertj.core.api.Assertions.assertThat - -class FAJobManagerImplTest { - - private val financialActionLoader = mockk() - private val financialActionPersister = mockk() - private val walletProxy = mockk() - - private val sut = FinancialActionJobManagerImpl(financialActionLoader, financialActionPersister, walletProxy) - - init { - coEvery { financialActionLoader.loadUnprocessed(any(), any()) } returns listOf(Valid.fa, Valid.fa) - coEvery { financialActionPersister.updateBatchStatus(any(), any()) } returns Unit - } - - @Test - fun given2FALoaded_whenProcessingFailed_thenUpdateStatusCalledRegardless() = runBlocking { - coEvery { - walletProxy.batchTransfer(any()) - } throws IllegalStateException() - - sut.processFinancialActions(0, 2) - coVerify(exactly = 1) { - walletProxy.batchTransfer(any()) - } - } - - @Test - fun givenFALoaded_validateParentsAreFirstInLine(): Unit = runBlocking { - val fa1 = FinancialAction(null, "", "", "", BigDecimal.ZERO, "", "", "", "", LocalDateTime.now(), id = 1) - val fa2 = FinancialAction(fa1, "", "", "", BigDecimal.ZERO, "", "", "", "", LocalDateTime.now(), id = 2) - val fa3 = FinancialAction(fa1, "", "", "", BigDecimal.ZERO, "", "", "", "", LocalDateTime.now(), id = 3) - val fa4 = FinancialAction(fa3, "", "", "", BigDecimal.ZERO, "", "", "", "", LocalDateTime.now(), id = 4) - val fa5 = FinancialAction(null, "", "", "", BigDecimal.ZERO, "", "", "", "", LocalDateTime.now(), id = 5) - val list = arrayListOf(fa5, fa4, fa3, fa2, fa1) - - val flatten = sut.sortAndFlattenFA(list) - - assertThat(flatten.indexOf(fa1)).isLessThan(flatten.indexOf(fa2)) - assertThat(flatten.indexOf(fa1)).isLessThan(flatten.indexOf(fa3)) - assertThat(flatten.indexOf(fa1)).isLessThan(flatten.indexOf(fa4)) - assertThat(flatten.indexOf(fa3)).isLessThan(flatten.indexOf(fa4)) - } - -} \ No newline at end of file diff --git a/accountant/accountant-core/src/test/kotlin/co/nilin/opex/accountant/core/service/OrderManagerImplTest.kt b/accountant/accountant-core/src/test/kotlin/co/nilin/opex/accountant/core/service/OrderManagerImplTest.kt index 201fdc255..9406ff297 100644 --- a/accountant/accountant-core/src/test/kotlin/co/nilin/opex/accountant/core/service/OrderManagerImplTest.kt +++ b/accountant/accountant-core/src/test/kotlin/co/nilin/opex/accountant/core/service/OrderManagerImplTest.kt @@ -23,7 +23,6 @@ import org.assertj.core.api.Assertions.assertThat import org.junit.jupiter.api.Test import org.mockito.kotlin.any import java.math.BigDecimal -import java.time.LocalDateTime internal class OrderManagerImplTest { @@ -34,6 +33,7 @@ internal class OrderManagerImplTest { private val pairConfigLoader = mockk() private val richOrderPublisher = mockk() private val userLevelLoader = mockk() + private val financialActionPublisher = mockk() private val orderManager = OrderManagerImpl( pairConfigLoader, @@ -42,7 +42,8 @@ internal class OrderManagerImplTest { financialActionLoader, orderPersister, tempEventPersister, - richOrderPublisher + richOrderPublisher, + financialActionPublisher ) init { @@ -52,7 +53,10 @@ internal class OrderManagerImplTest { coEvery { tempEventPersister.saveTempEvent(any(), any()) } returns any() coEvery { financialActionLoader.findLast(any(), any()) } returns null coEvery { financialActionPersister.persist(any()) } returnsArgument (0) + coEvery { financialActionPersister.updateStatus(any(), any()) } returns Unit + coEvery { financialActionPersister.updateStatus(any(), any()) } returns Unit coEvery { userLevelLoader.load(any()) } returns "*" + coEvery { financialActionPublisher.publish(any()) } returns Unit } @Test diff --git a/accountant/accountant-core/src/test/kotlin/co/nilin/opex/accountant/core/service/TradeManagerImplTest.kt b/accountant/accountant-core/src/test/kotlin/co/nilin/opex/accountant/core/service/TradeManagerImplTest.kt index 4e577b3cb..0488eab12 100644 --- a/accountant/accountant-core/src/test/kotlin/co/nilin/opex/accountant/core/service/TradeManagerImplTest.kt +++ b/accountant/accountant-core/src/test/kotlin/co/nilin/opex/accountant/core/service/TradeManagerImplTest.kt @@ -1,5 +1,6 @@ package co.nilin.opex.accountant.core.service +import co.nilin.opex.accountant.core.model.FinancialAction import co.nilin.opex.accountant.core.model.Order import co.nilin.opex.accountant.core.model.PairConfig import co.nilin.opex.accountant.core.model.PairFeeConfig @@ -27,6 +28,7 @@ internal class TradeManagerImplTest { private val richOrderPublisher = mockk() private val richTradePublisher = mockk() private val userLevelLoader = mockk() + private val financialActionPublisher = mockk() private val orderManager = OrderManagerImpl( pairConfigLoader, @@ -35,7 +37,8 @@ internal class TradeManagerImplTest { financeActionLoader, orderPersister, tempEventPersister, - richOrderPublisher + richOrderPublisher, + financialActionPublisher ) private val tradeManager = TradeManagerImpl( @@ -45,7 +48,8 @@ internal class TradeManagerImplTest { tempEventPersister, richTradePublisher, richOrderPublisher, - FeeCalculatorImpl("0x0") + FeeCalculatorImpl("0x0"), + financialActionPublisher ) init { @@ -55,6 +59,9 @@ internal class TradeManagerImplTest { coEvery { richOrderPublisher.publish(any()) } returns Unit coEvery { richTradePublisher.publish(any()) } returns Unit coEvery { userLevelLoader.load(any()) } returns "*" + coEvery { financialActionPublisher.publish(any()) } returns Unit + coEvery { financialActionPersister.updateStatus(any(), any()) } returns Unit + coEvery { financialActionPersister.updateStatus(any(), any()) } returns Unit } @Test diff --git a/accountant/accountant-core/src/test/kotlin/co/nilin/opex/accountant/core/service/Valid.kt b/accountant/accountant-core/src/test/kotlin/co/nilin/opex/accountant/core/service/Valid.kt index d54b99029..ae5b5930c 100644 --- a/accountant/accountant-core/src/test/kotlin/co/nilin/opex/accountant/core/service/Valid.kt +++ b/accountant/accountant-core/src/test/kotlin/co/nilin/opex/accountant/core/service/Valid.kt @@ -39,7 +39,7 @@ object Valid { "system", "main", currentTime, - 15 + id = 15 ) var makerOrder = Order( diff --git a/accountant/accountant-ports/accountant-persister-postgres/src/main/kotlin/co/nilin/opex/accountant/ports/postgres/dao/FinancialActionRepository.kt b/accountant/accountant-ports/accountant-persister-postgres/src/main/kotlin/co/nilin/opex/accountant/ports/postgres/dao/FinancialActionRepository.kt index 4ef86a1fa..e682148f9 100644 --- a/accountant/accountant-ports/accountant-persister-postgres/src/main/kotlin/co/nilin/opex/accountant/ports/postgres/dao/FinancialActionRepository.kt +++ b/accountant/accountant-ports/accountant-persister-postgres/src/main/kotlin/co/nilin/opex/accountant/ports/postgres/dao/FinancialActionRepository.kt @@ -15,7 +15,7 @@ import java.math.BigDecimal interface FinancialActionRepository : ReactiveCrudRepository { @Query("select * from fi_actions fi where pointer = :ouid and :uuid in (fi.sender, fi.receiver)") - fun findByOuidAndUuid( + fun findByOuidAndUserUuid( @Param("ouid") ouid: String, @Param("uuid") uuid: String, paging: Pageable @@ -33,11 +33,11 @@ interface FinancialActionRepository : ReactiveCrudRepository @Query("update fi_actions set status = :status where id = :id") - fun updateStatus(@Param("id") id: Long, @Param("status") status: FinancialActionStatus) + fun updateStatus(@Param("id") id: Long, @Param("status") status: FinancialActionStatus): Mono - @Query("update fi_actions set status = :status, retry_count = retry_count + 1 where id = :id") - fun updateStatusAndIncreaseRetry(@Param("id") id: Long, @Param("status") status: FinancialActionStatus): Mono + @Query("update fi_actions set status = :status where uuid = :uuid") + fun updateStatus(uuid: String, status: FinancialActionStatus): Mono @Query("update fi_actions set status = :status where id in (:ids)") - fun updateStatus(ids: List, status: FinancialActionStatus): Mono + fun updateBatchStatus(ids: List, status: FinancialActionStatus): Mono } \ No newline at end of file diff --git a/accountant/accountant-ports/accountant-persister-postgres/src/main/kotlin/co/nilin/opex/accountant/ports/postgres/impl/FinancialActionLoaderImpl.kt b/accountant/accountant-ports/accountant-persister-postgres/src/main/kotlin/co/nilin/opex/accountant/ports/postgres/impl/FinancialActionLoaderImpl.kt index 643a6f80e..34d5c35aa 100644 --- a/accountant/accountant-ports/accountant-persister-postgres/src/main/kotlin/co/nilin/opex/accountant/ports/postgres/impl/FinancialActionLoaderImpl.kt +++ b/accountant/accountant-ports/accountant-persister-postgres/src/main/kotlin/co/nilin/opex/accountant/ports/postgres/impl/FinancialActionLoaderImpl.kt @@ -25,16 +25,16 @@ class FinancialActionLoaderImpl(val financialActionRepository: FinancialActionRe .toList() } - override suspend fun findLast(uuid: String, ouid: String): FinancialAction? { - return financialActionRepository.findByOuidAndUuid( - ouid, uuid, PageRequest.of(0, 1, Sort.by(Sort.Direction.DESC, "createDate")) + override suspend fun findLast(userUuid: String, ouid: String): FinancialAction? { + return financialActionRepository.findByOuidAndUserUuid( + ouid, userUuid, PageRequest.of(0, 1, Sort.by(Sort.Direction.DESC, "createDate")) ).map { loadFinancialAction(it.id) } .firstOrNull() } - override suspend fun countUnprocessed(uuid: String, symbol: String, eventType: String): Long { + override suspend fun countUnprocessed(userUuid: String, symbol: String, eventType: String): Long { return financialActionRepository.findByUuidAndSymbolAndEventTypeAndStatus( - uuid, + userUuid, symbol, eventType, FinancialActionStatus.CREATED @@ -55,7 +55,8 @@ class FinancialActionLoaderImpl(val financialActionRepository: FinancialActionRe fim.receiver, fim.receiverWalletType, fim.createDate, - fim.retryCount, + fim.status, + fim.uuid, fim.id ) } diff --git a/accountant/accountant-ports/accountant-persister-postgres/src/main/kotlin/co/nilin/opex/accountant/ports/postgres/impl/FinancialActionPersisterImpl.kt b/accountant/accountant-ports/accountant-persister-postgres/src/main/kotlin/co/nilin/opex/accountant/ports/postgres/impl/FinancialActionPersisterImpl.kt index 7cb5bb08a..b9fd7e1aa 100644 --- a/accountant/accountant-ports/accountant-persister-postgres/src/main/kotlin/co/nilin/opex/accountant/ports/postgres/impl/FinancialActionPersisterImpl.kt +++ b/accountant/accountant-ports/accountant-persister-postgres/src/main/kotlin/co/nilin/opex/accountant/ports/postgres/impl/FinancialActionPersisterImpl.kt @@ -7,6 +7,7 @@ import co.nilin.opex.accountant.ports.postgres.dao.FinancialActionRepository import co.nilin.opex.accountant.ports.postgres.model.FinancialActionModel import kotlinx.coroutines.reactive.awaitFirstOrNull import kotlinx.coroutines.reactor.awaitSingle +import kotlinx.coroutines.reactor.awaitSingleOrNull import org.springframework.stereotype.Component @Component @@ -17,6 +18,7 @@ class FinancialActionPersisterImpl(private val financialActionRepository: Financ financialActionRepository.saveAll(financialActions.map { FinancialActionModel( null, + it.uuid, it.parent?.id, it.eventType, it.pointer, @@ -34,11 +36,39 @@ class FinancialActionPersisterImpl(private val financialActionRepository: Financ return financialActions } + override suspend fun persistWithStatus(financialAction: FinancialAction, status: FinancialActionStatus) { + financialActionRepository.save( + with(financialAction) { + FinancialActionModel( + null, + uuid, + parent?.id, + eventType, + pointer, + symbol, + amount, + sender, + senderWalletType, + receiver, + receiverWalletType, + "", + "", + createDate, + status + ) + } + ).awaitSingle() + } + override suspend fun updateStatus(financialAction: FinancialAction, status: FinancialActionStatus) { - financialActionRepository.updateStatusAndIncreaseRetry(financialAction.id!!, status).awaitFirstOrNull() + financialActionRepository.updateStatus(financialAction.id!!, status).awaitSingleOrNull() + } + + override suspend fun updateStatus(faUuid: String, status: FinancialActionStatus) { + financialActionRepository.updateStatus(faUuid, status).awaitSingleOrNull() } override suspend fun updateBatchStatus(financialAction: List, status: FinancialActionStatus) { - financialActionRepository.updateStatus(financialAction.mapNotNull { it.id }, status).awaitFirstOrNull() + financialActionRepository.updateBatchStatus(financialAction.mapNotNull { it.id }, status).awaitFirstOrNull() } } \ No newline at end of file diff --git a/accountant/accountant-ports/accountant-persister-postgres/src/main/kotlin/co/nilin/opex/accountant/ports/postgres/model/FinancialActionModel.kt b/accountant/accountant-ports/accountant-persister-postgres/src/main/kotlin/co/nilin/opex/accountant/ports/postgres/model/FinancialActionModel.kt index e7f5ac076..497c87f32 100644 --- a/accountant/accountant-ports/accountant-persister-postgres/src/main/kotlin/co/nilin/opex/accountant/ports/postgres/model/FinancialActionModel.kt +++ b/accountant/accountant-ports/accountant-persister-postgres/src/main/kotlin/co/nilin/opex/accountant/ports/postgres/model/FinancialActionModel.kt @@ -10,6 +10,7 @@ import java.time.LocalDateTime @Table("fi_actions") data class FinancialActionModel( @Id var id: Long?, + val uuid: String, @Column("parent_id") var parentId: Long?, @Column("event_type") val eventType: String, val pointer: String, @@ -22,9 +23,7 @@ data class FinancialActionModel( val agent: String, val ip: String, @Column("create_date") val createDate: LocalDateTime, - val status: FinancialActionStatus = FinancialActionStatus.CREATED, - @Column("retry_count") val retryCount: Int = 0, - @Column("last_try_date") val lastTryDate: LocalDateTime? = null + val status: FinancialActionStatus = FinancialActionStatus.CREATED ) diff --git a/accountant/accountant-ports/accountant-persister-postgres/src/main/resources/schema.sql b/accountant/accountant-ports/accountant-persister-postgres/src/main/resources/schema.sql index 762694d1a..32c0aa5d2 100644 --- a/accountant/accountant-ports/accountant-persister-postgres/src/main/resources/schema.sql +++ b/accountant/accountant-ports/accountant-persister-postgres/src/main/resources/schema.sql @@ -30,6 +30,7 @@ CREATE TABLE IF NOT EXISTS orders CREATE TABLE IF NOT EXISTS fi_actions ( id SERIAL PRIMARY KEY, + uuid VARCHAR(72) NOT NULL UNIQUE, parent_id INTEGER, event_type VARCHAR(72) NOT NULL, pointer VARCHAR(72) NOT NULL, @@ -42,9 +43,7 @@ CREATE TABLE IF NOT EXISTS fi_actions agent VARCHAR(20), ip VARCHAR(11), create_date TIMESTAMP NOT NULL, - status VARCHAR(20), - retry_count DECIMAL, - last_try_date TIMESTAMP + status VARCHAR(20) ); CREATE TABLE IF NOT EXISTS pair_config diff --git a/accountant/accountant-ports/accountant-persister-postgres/src/test/kotlin/co/nilin/opex/accountant/ports/postgres/FAPersisterImplTest.kt b/accountant/accountant-ports/accountant-persister-postgres/src/test/kotlin/co/nilin/opex/accountant/ports/postgres/FAPersisterImplTest.kt index 5813cb072..65f723937 100644 --- a/accountant/accountant-ports/accountant-persister-postgres/src/test/kotlin/co/nilin/opex/accountant/ports/postgres/FAPersisterImplTest.kt +++ b/accountant/accountant-ports/accountant-persister-postgres/src/test/kotlin/co/nilin/opex/accountant/ports/postgres/FAPersisterImplTest.kt @@ -17,7 +17,7 @@ class FAPersisterImplTest { private val financialActionRepository = mockk { coEvery { saveAll(any() as Iterable) } returns Flux.just(Valid.faModel) - coEvery { updateStatusAndIncreaseRetry(any(), any()) } returns Mono.empty() + coEvery { updateBatchStatus(any(), any()) } returns Mono.empty() } private val faPersister = FinancialActionPersisterImpl(financialActionRepository) @@ -29,9 +29,10 @@ class FAPersisterImplTest { @Test fun givenFAAndStatus_whenUpdatingStatusAndFANotFound_throwException(): Unit = runBlocking { + coEvery { financialActionRepository.updateStatus(eq(Valid.fa.id!!), any()) } returns Mono.empty() faPersister.updateStatus(Valid.fa, FinancialActionStatus.CREATED) coVerify { - financialActionRepository.updateStatusAndIncreaseRetry( + financialActionRepository.updateStatus( eq(Valid.fa.id!!), eq(FinancialActionStatus.CREATED) ) diff --git a/accountant/accountant-ports/accountant-persister-postgres/src/test/kotlin/co/nilin/opex/accountant/ports/postgres/Valid.kt b/accountant/accountant-ports/accountant-persister-postgres/src/test/kotlin/co/nilin/opex/accountant/ports/postgres/Valid.kt index 9931a14ff..6d98cc55c 100644 --- a/accountant/accountant-ports/accountant-persister-postgres/src/test/kotlin/co/nilin/opex/accountant/ports/postgres/Valid.kt +++ b/accountant/accountant-ports/accountant-persister-postgres/src/test/kotlin/co/nilin/opex/accountant/ports/postgres/Valid.kt @@ -14,6 +14,7 @@ import co.nilin.opex.matching.engine.core.model.OrderType import co.nilin.opex.matching.engine.core.model.Pair import java.math.BigDecimal import java.time.LocalDateTime +import java.util.UUID object Valid { @@ -128,12 +129,13 @@ object Valid { "system", "main", currentTime, - 1, - 1 + id = 1, + uuid = "uuid" ) val faModel = FinancialActionModel( null, + "uuid", null, TradeEvent::class.java.name, "trade_id", @@ -145,7 +147,7 @@ object Valid { "main", "", "", - currentTime + currentTime, ) } \ No newline at end of file diff --git a/accountant/accountant-ports/accountant-submitter-kafka/src/main/kotlin/co/nilin/opex/accountant/ports/kafka/submitter/config/KafkaTopicConfig.kt b/accountant/accountant-ports/accountant-submitter-kafka/src/main/kotlin/co/nilin/opex/accountant/ports/kafka/submitter/config/KafkaTopicConfig.kt index b8ca2965f..d323a4547 100644 --- a/accountant/accountant-ports/accountant-submitter-kafka/src/main/kotlin/co/nilin/opex/accountant/ports/kafka/submitter/config/KafkaTopicConfig.kt +++ b/accountant/accountant-ports/accountant-submitter-kafka/src/main/kotlin/co/nilin/opex/accountant/ports/kafka/submitter/config/KafkaTopicConfig.kt @@ -35,6 +35,14 @@ class KafkaTopicConfig { .build() }) + registerBean("topic_fiAction", NewTopic::class.java, Supplier { + TopicBuilder.name("fiAction") + .partitions(10) + .replicas(3) + .config(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, "2") + .build() + }) + registerBean("topic_tempevents", NewTopic::class.java, "tempevents", 1, 1) } logger.info("Kafka topics created") diff --git a/accountant/accountant-ports/accountant-submitter-kafka/src/main/kotlin/co/nilin/opex/accountant/ports/kafka/submitter/config/SubmitterKafkaConfig.kt b/accountant/accountant-ports/accountant-submitter-kafka/src/main/kotlin/co/nilin/opex/accountant/ports/kafka/submitter/config/SubmitterKafkaConfig.kt index 800a82105..ada140c75 100644 --- a/accountant/accountant-ports/accountant-submitter-kafka/src/main/kotlin/co/nilin/opex/accountant/ports/kafka/submitter/config/SubmitterKafkaConfig.kt +++ b/accountant/accountant-ports/accountant-submitter-kafka/src/main/kotlin/co/nilin/opex/accountant/ports/kafka/submitter/config/SubmitterKafkaConfig.kt @@ -1,7 +1,9 @@ package co.nilin.opex.accountant.ports.kafka.submitter.config +import co.nilin.opex.accountant.core.inout.FinancialActionEvent import co.nilin.opex.accountant.core.inout.RichOrderEvent import co.nilin.opex.accountant.core.inout.RichTrade +import co.nilin.opex.accountant.core.model.FinancialAction import co.nilin.opex.matching.engine.core.eventh.events.CoreEvent import org.apache.kafka.clients.producer.ProducerConfig import org.apache.kafka.common.serialization.StringSerializer @@ -27,7 +29,7 @@ class SubmitterKafkaConfig { ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG to StringSerializer::class.java, ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG to JsonSerializer::class.java, ProducerConfig.ACKS_CONFIG to "all", - JsonSerializer.TYPE_MAPPINGS to "rich_order_event:co.nilin.opex.accountant.core.inout.RichOrderEvent,rich_order:co.nilin.opex.accountant.core.inout.RichOrder,rich_order_update:co.nilin.opex.accountant.core.inout.RichOrderUpdate, rich_trade:co.nilin.opex.accountant.core.inout.RichTrade" + JsonSerializer.TYPE_MAPPINGS to "rich_order_event:co.nilin.opex.accountant.core.inout.RichOrderEvent,rich_order:co.nilin.opex.accountant.core.inout.RichOrder,rich_order_update:co.nilin.opex.accountant.core.inout.RichOrderUpdate, rich_trade:co.nilin.opex.accountant.core.inout.RichTrade,financial_action:co.nilin.opex.accountant.core.inout.FinancialActionEvent" //ProducerConfig.CLIENT_ID_CONFIG to "", omitting this option as it produces InstanceAlreadyExistsException ) } @@ -61,4 +63,14 @@ class SubmitterKafkaConfig { fun richOrderKafkaTemplate(@Qualifier("richOrderProducerFactory") producerFactory: ProducerFactory): KafkaTemplate { return KafkaTemplate(producerFactory) } + + @Bean("fiActionProducerFactory") + fun fiActionProducerFactory(@Qualifier("producerConfigs") producerConfigs: Map): ProducerFactory { + return DefaultKafkaProducerFactory(producerConfigs) + } + + @Bean("fiActionKafkaTemplate") + fun fiActionKafkaTemplate(@Qualifier("fiActionProducerFactory") producerFactory: ProducerFactory): KafkaTemplate { + return KafkaTemplate(producerFactory) + } } \ No newline at end of file diff --git a/accountant/accountant-ports/accountant-submitter-kafka/src/main/kotlin/co/nilin/opex/accountant/ports/kafka/submitter/service/FinancialActionSubmitter.kt b/accountant/accountant-ports/accountant-submitter-kafka/src/main/kotlin/co/nilin/opex/accountant/ports/kafka/submitter/service/FinancialActionSubmitter.kt new file mode 100644 index 000000000..cbd66dd8b --- /dev/null +++ b/accountant/accountant-ports/accountant-submitter-kafka/src/main/kotlin/co/nilin/opex/accountant/ports/kafka/submitter/service/FinancialActionSubmitter.kt @@ -0,0 +1,49 @@ +package co.nilin.opex.accountant.ports.kafka.submitter.service + +import co.nilin.opex.accountant.core.inout.FinancialActionEvent +import co.nilin.opex.accountant.core.model.FinancialAction +import co.nilin.opex.accountant.core.spi.FinancialActionPublisher +import org.slf4j.LoggerFactory +import org.springframework.beans.factory.annotation.Qualifier +import org.springframework.kafka.core.KafkaTemplate +import org.springframework.stereotype.Component +import kotlin.coroutines.resume +import kotlin.coroutines.resumeWithException +import kotlin.coroutines.suspendCoroutine + +@Component +class FinancialActionSubmitter( + @Qualifier("fiActionKafkaTemplate") + private val kafkaTemplate: KafkaTemplate +) : FinancialActionPublisher, EventPublisher { + + private val logger = LoggerFactory.getLogger(FinancialActionSubmitter::class.java) + override val topic: String = "fiAction" + + override suspend fun publish(fa: FinancialAction): Unit = suspendCoroutine { cont -> + logger.info("Sending financial action event") + val sendFuture = kafkaTemplate.send( + topic, + with(fa) { + FinancialActionEvent( + uuid, + symbol, + amount, + sender, + senderWalletType, + receiver, + receiverWalletType, + createDate, + null, + eventType + pointer + ) + } + ) + sendFuture.addCallback({ + cont.resume(Unit) + }, { + logger.error("Error submitting financial action ${fa.uuid}", it) + cont.resumeWithException(it) + }) + } +} \ No newline at end of file diff --git a/docker-images/vault/Dockerfile b/docker-images/vault/Dockerfile index a4e183276..38d4f35fa 100644 --- a/docker-images/vault/Dockerfile +++ b/docker-images/vault/Dockerfile @@ -1,4 +1,4 @@ -FROM vault:1.12.2 +FROM vault:1.11.7 COPY ["backend-policy.hcl", "panel-policy.hcl", "vault.json", "workflow-vault.sh", "/vault/config/"] EXPOSE 8200 ENTRYPOINT /vault/config/workflow-vault.sh diff --git a/wallet/wallet-app/src/main/kotlin/co/nilin/opex/wallet/app/config/AppConfig.kt b/wallet/wallet-app/src/main/kotlin/co/nilin/opex/wallet/app/config/AppConfig.kt index 445c8502a..6ffd2b2cb 100644 --- a/wallet/wallet-app/src/main/kotlin/co/nilin/opex/wallet/app/config/AppConfig.kt +++ b/wallet/wallet-app/src/main/kotlin/co/nilin/opex/wallet/app/config/AppConfig.kt @@ -1,8 +1,11 @@ package co.nilin.opex.wallet.app.config import co.nilin.opex.wallet.ports.kafka.listener.consumer.AdminEventKafkaListener +import co.nilin.opex.wallet.ports.kafka.listener.consumer.FinancialActionKafkaListener import co.nilin.opex.wallet.ports.kafka.listener.consumer.UserCreatedKafkaListener +import co.nilin.opex.wallet.ports.kafka.listener.model.FinancialActionEvent import co.nilin.opex.wallet.ports.kafka.listener.spi.AdminEventListener +import co.nilin.opex.wallet.ports.kafka.listener.spi.FinancialActionEventListener import co.nilin.opex.wallet.ports.kafka.listener.spi.UserCreatedEventListener import com.fasterxml.jackson.databind.ObjectMapper import com.fasterxml.jackson.databind.SerializationFeature @@ -22,9 +25,12 @@ class AppConfig { userCreatedEventListener: UserCreatedEventListener, adminKafkaEventListener: AdminEventKafkaListener, adminEventListener: AdminEventListener, + financialActionKafkaListener: FinancialActionKafkaListener, + financialActionEventListener: FinancialActionEventListener, ) { useCreatedKafkaListener.addEventListener(userCreatedEventListener) adminKafkaEventListener.addEventListener(adminEventListener) + financialActionKafkaListener.addEventListener(financialActionEventListener) } @Bean diff --git a/wallet/wallet-app/src/main/kotlin/co/nilin/opex/wallet/app/listener/FinancialActionEventListenerImpl.kt b/wallet/wallet-app/src/main/kotlin/co/nilin/opex/wallet/app/listener/FinancialActionEventListenerImpl.kt new file mode 100644 index 000000000..ba0a3e425 --- /dev/null +++ b/wallet/wallet-app/src/main/kotlin/co/nilin/opex/wallet/app/listener/FinancialActionEventListenerImpl.kt @@ -0,0 +1,35 @@ +package co.nilin.opex.wallet.app.listener + +import co.nilin.opex.wallet.app.service.TransferService +import co.nilin.opex.wallet.ports.kafka.listener.model.FinancialActionEvent +import co.nilin.opex.wallet.ports.kafka.listener.spi.FinancialActionEventListener +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.runBlocking +import org.slf4j.LoggerFactory +import org.springframework.stereotype.Component + +@Component +class FinancialActionEventListenerImpl(private val transferService: TransferService) : FinancialActionEventListener { + + private val logger = LoggerFactory.getLogger(FinancialActionEventListenerImpl::class.java) + + override fun id(): String { + return "FinancialActionEventListener" + } + + override fun onEvent(event: FinancialActionEvent, partition: Int, offset: Long, timestamp: Long) { + logger.info("On FinancialActionEvent ${event.uuid}") + runBlocking(Dispatchers.IO) { + transferService.transfer( + event.symbol, + event.senderWalletType, + event.sender, + event.receiverWalletType, + event.receiver, + event.amount, + event.description, + event.transferRef + ) + } + } +} \ No newline at end of file diff --git a/wallet/wallet-ports/wallet-eventlistener-kafka/src/main/kotlin/co/nilin/opex/wallet/ports/kafka/listener/config/KafkaProducerConfig.kt b/wallet/wallet-ports/wallet-eventlistener-kafka/src/main/kotlin/co/nilin/opex/wallet/ports/kafka/listener/config/KafkaProducerConfig.kt index 574120a10..391904973 100644 --- a/wallet/wallet-ports/wallet-eventlistener-kafka/src/main/kotlin/co/nilin/opex/wallet/ports/kafka/listener/config/KafkaProducerConfig.kt +++ b/wallet/wallet-ports/wallet-eventlistener-kafka/src/main/kotlin/co/nilin/opex/wallet/ports/kafka/listener/config/KafkaProducerConfig.kt @@ -1,6 +1,7 @@ package co.nilin.opex.wallet.ports.kafka.listener.config import co.nilin.opex.wallet.ports.kafka.listener.model.AdminEvent +import co.nilin.opex.wallet.ports.kafka.listener.model.FinancialActionEvent import co.nilin.opex.wallet.ports.kafka.listener.model.UserCreatedEvent import org.apache.kafka.clients.producer.ProducerConfig import org.apache.kafka.common.serialization.StringSerializer @@ -39,6 +40,16 @@ class KafkaProducerConfig { return KafkaTemplate(factory) } + @Bean("financialActionProducerFactory") + fun financialActionProducerFactory(@Qualifier("producerConfigs") producerConfigs: Map): ProducerFactory { + return DefaultKafkaProducerFactory(producerConfigs) + } + + @Bean("financialActionKafkaTemplate") + fun financialActionKafkaTemplate(factory: ProducerFactory): KafkaTemplate { + return KafkaTemplate(factory) + } + @Bean fun adminProducerFactory(@Qualifier("producerConfigs") config: Map): ProducerFactory { return DefaultKafkaProducerFactory(config) diff --git a/wallet/wallet-ports/wallet-eventlistener-kafka/src/main/kotlin/co/nilin/opex/wallet/ports/kafka/listener/config/WalletKafkaConfig.kt b/wallet/wallet-ports/wallet-eventlistener-kafka/src/main/kotlin/co/nilin/opex/wallet/ports/kafka/listener/config/WalletKafkaConfig.kt index e89b86978..0c48ed5f9 100644 --- a/wallet/wallet-ports/wallet-eventlistener-kafka/src/main/kotlin/co/nilin/opex/wallet/ports/kafka/listener/config/WalletKafkaConfig.kt +++ b/wallet/wallet-ports/wallet-eventlistener-kafka/src/main/kotlin/co/nilin/opex/wallet/ports/kafka/listener/config/WalletKafkaConfig.kt @@ -1,8 +1,10 @@ package co.nilin.opex.wallet.ports.kafka.listener.config import co.nilin.opex.wallet.ports.kafka.listener.consumer.AdminEventKafkaListener +import co.nilin.opex.wallet.ports.kafka.listener.consumer.FinancialActionKafkaListener import co.nilin.opex.wallet.ports.kafka.listener.consumer.UserCreatedKafkaListener import co.nilin.opex.wallet.ports.kafka.listener.model.AdminEvent +import co.nilin.opex.wallet.ports.kafka.listener.model.FinancialActionEvent import co.nilin.opex.wallet.ports.kafka.listener.model.UserCreatedEvent import org.apache.kafka.clients.consumer.ConsumerConfig import org.apache.kafka.common.TopicPartition @@ -38,7 +40,7 @@ class WalletKafkaConfig { ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG to StringDeserializer::class.java, ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG to JsonDeserializer::class.java, JsonDeserializer.TRUSTED_PACKAGES to "co.nilin.opex.*", - JsonDeserializer.TYPE_MAPPINGS to "user_created_event:co.nilin.opex.wallet.ports.kafka.listener.model.UserCreatedEvent,admin_add_currency:co.nilin.opex.wallet.ports.kafka.listener.model.AddCurrencyEvent,admin_edit_currency:co.nilin.opex.wallet.ports.kafka.listener.model.EditCurrencyEvent,admin_delete_currency:co.nilin.opex.wallet.ports.kafka.listener.model.DeleteCurrencyEvent" + JsonDeserializer.TYPE_MAPPINGS to "user_created_event:co.nilin.opex.wallet.ports.kafka.listener.model.UserCreatedEvent,admin_add_currency:co.nilin.opex.wallet.ports.kafka.listener.model.AddCurrencyEvent,admin_edit_currency:co.nilin.opex.wallet.ports.kafka.listener.model.EditCurrencyEvent,admin_delete_currency:co.nilin.opex.wallet.ports.kafka.listener.model.DeleteCurrencyEvent,financial_action:co.nilin.opex.wallet.ports.kafka.listener.model.FinancialActionEvent" ) } @@ -47,6 +49,11 @@ class WalletKafkaConfig { return DefaultKafkaConsumerFactory(consumerConfigs) } + @Bean("financialActionConsumerFactory") + fun financialActionConsumerFactory(@Qualifier("consumerConfigs") consumerConfigs: Map): ConsumerFactory { + return DefaultKafkaConsumerFactory(consumerConfigs) + } + @Bean fun adminEventsConsumerFactory(@Qualifier("consumerConfigs") consumerConfigs: Map): ConsumerFactory { return DefaultKafkaConsumerFactory(consumerConfigs) @@ -67,6 +74,21 @@ class WalletKafkaConfig { container.start() } + @Autowired + @ConditionalOnBean(FinancialActionKafkaListener::class) + fun configureFinancialActionListener( + listener: FinancialActionKafkaListener, + template: KafkaTemplate, + @Qualifier("financialActionConsumerFactory") consumerFactory: ConsumerFactory + ) { + val containerProps = ContainerProperties(Pattern.compile("fiAction")) + containerProps.messageListener = listener + val container = ConcurrentMessageListenerContainer(consumerFactory, containerProps) + container.setBeanName("FinancialActionKafkaListenerContainer") + container.commonErrorHandler = createConsumerErrorHandler(template, "fiAction.DLT") + container.start() + } + @Autowired @ConditionalOnBean(AdminEventKafkaListener::class) fun configureAdminEventListener( diff --git a/wallet/wallet-ports/wallet-eventlistener-kafka/src/main/kotlin/co/nilin/opex/wallet/ports/kafka/listener/consumer/FinancialActionKafkaListener.kt b/wallet/wallet-ports/wallet-eventlistener-kafka/src/main/kotlin/co/nilin/opex/wallet/ports/kafka/listener/consumer/FinancialActionKafkaListener.kt new file mode 100644 index 000000000..08a4e196c --- /dev/null +++ b/wallet/wallet-ports/wallet-eventlistener-kafka/src/main/kotlin/co/nilin/opex/wallet/ports/kafka/listener/consumer/FinancialActionKafkaListener.kt @@ -0,0 +1,24 @@ +package co.nilin.opex.wallet.ports.kafka.listener.consumer + +import co.nilin.opex.wallet.ports.kafka.listener.model.FinancialActionEvent +import co.nilin.opex.wallet.ports.kafka.listener.spi.FinancialActionEventListener +import co.nilin.opex.wallet.ports.kafka.listener.spi.UserCreatedEventListener +import org.apache.kafka.clients.consumer.ConsumerRecord +import org.springframework.kafka.listener.MessageListener +import org.springframework.stereotype.Component + +@Component +class FinancialActionKafkaListener : MessageListener { + + val eventListeners = arrayListOf() + + override fun onMessage(data: ConsumerRecord) { + eventListeners.forEach { + it.onEvent(data.value(), data.partition(), data.offset(), data.timestamp()) + } + } + + fun addEventListener(tl: FinancialActionEventListener) { + eventListeners.add(tl) + } +} \ No newline at end of file diff --git a/wallet/wallet-ports/wallet-eventlistener-kafka/src/main/kotlin/co/nilin/opex/wallet/ports/kafka/listener/model/FinancialActionEvent.kt b/wallet/wallet-ports/wallet-eventlistener-kafka/src/main/kotlin/co/nilin/opex/wallet/ports/kafka/listener/model/FinancialActionEvent.kt new file mode 100644 index 000000000..ceee87c78 --- /dev/null +++ b/wallet/wallet-ports/wallet-eventlistener-kafka/src/main/kotlin/co/nilin/opex/wallet/ports/kafka/listener/model/FinancialActionEvent.kt @@ -0,0 +1,46 @@ +package co.nilin.opex.wallet.ports.kafka.listener.model + +import java.math.BigDecimal +import java.time.LocalDateTime + +class FinancialActionEvent { + + lateinit var uuid: String + lateinit var symbol: String + lateinit var amount: BigDecimal + lateinit var sender: String + lateinit var senderWalletType: String + lateinit var receiver: String + lateinit var receiverWalletType: String + lateinit var createDate: LocalDateTime + var transferRef: String? = null + lateinit var description: String + + constructor() { + + } + + constructor( + uuid: String, + symbol: String, + amount: BigDecimal, + sender: String, + senderWalletType: String, + receiver: String, + receiverWalletType: String, + createDate: LocalDateTime, + transferRef: String, + description: String + ) { + this.uuid = uuid + this.symbol = symbol + this.amount = amount + this.sender = sender + this.senderWalletType = senderWalletType + this.receiver = receiver + this.receiverWalletType = receiverWalletType + this.createDate = createDate + this.transferRef = transferRef + this.description = description + } +} \ No newline at end of file diff --git a/wallet/wallet-ports/wallet-eventlistener-kafka/src/main/kotlin/co/nilin/opex/wallet/ports/kafka/listener/spi/FinancialActionEventListener.kt b/wallet/wallet-ports/wallet-eventlistener-kafka/src/main/kotlin/co/nilin/opex/wallet/ports/kafka/listener/spi/FinancialActionEventListener.kt new file mode 100644 index 000000000..908b63cfd --- /dev/null +++ b/wallet/wallet-ports/wallet-eventlistener-kafka/src/main/kotlin/co/nilin/opex/wallet/ports/kafka/listener/spi/FinancialActionEventListener.kt @@ -0,0 +1,11 @@ +package co.nilin.opex.wallet.ports.kafka.listener.spi + +import co.nilin.opex.wallet.ports.kafka.listener.model.FinancialActionEvent + +interface FinancialActionEventListener { + + fun id(): String + + fun onEvent(event: FinancialActionEvent, partition: Int, offset: Long, timestamp: Long) + +} \ No newline at end of file