diff --git a/accountant/accountant-app/src/main/kotlin/co/nilin/opex/accountant/app/config/AppConfig.kt b/accountant/accountant-app/src/main/kotlin/co/nilin/opex/accountant/app/config/AppConfig.kt index b944ed180..e4fe77f13 100644 --- a/accountant/accountant-app/src/main/kotlin/co/nilin/opex/accountant/app/config/AppConfig.kt +++ b/accountant/accountant-app/src/main/kotlin/co/nilin/opex/accountant/app/config/AppConfig.kt @@ -104,7 +104,7 @@ class AppConfig { @Autowired fun configureOrderListener(orderKafkaListener: OrderKafkaListener, orderListener: OrderListener) { - orderKafkaListener.addOrderListener(orderListener) + orderKafkaListener.addListener(orderListener) } @Autowired @@ -112,7 +112,7 @@ class AppConfig { tradeKafkaListener: TradeKafkaListener, accountantTradeListener: AccountantTradeListener ) { - tradeKafkaListener.addTradeListener(accountantTradeListener) + tradeKafkaListener.addListener(accountantTradeListener) } @Autowired @@ -120,7 +120,7 @@ class AppConfig { eventKafkaListener: EventKafkaListener, accountantEventListener: AccountantEventListener ) { - eventKafkaListener.addEventListener(accountantEventListener) + eventKafkaListener.addListener(accountantEventListener) } @Autowired @@ -128,7 +128,7 @@ class AppConfig { tempEventKafkaListener: TempEventKafkaListener, accountantTempEventListener: AccountantTempEventListener ) { - tempEventKafkaListener.addEventListener(accountantTempEventListener) + tempEventKafkaListener.addListener(accountantTempEventListener) } } \ No newline at end of file diff --git a/accountant/accountant-app/src/main/kotlin/co/nilin/opex/accountant/app/listener/AccountantEventListener.kt b/accountant/accountant-app/src/main/kotlin/co/nilin/opex/accountant/app/listener/AccountantEventListener.kt index e1de3e767..80dd26f72 100644 --- a/accountant/accountant-app/src/main/kotlin/co/nilin/opex/accountant/app/listener/AccountantEventListener.kt +++ b/accountant/accountant-app/src/main/kotlin/co/nilin/opex/accountant/app/listener/AccountantEventListener.kt @@ -11,15 +11,15 @@ class AccountantEventListener(private val orderManager: OrderManager) : EventLis return "EventListener" } - override fun onEvent(coreEvent: CoreEvent, partition: Int, offset: Long, timestamp: Long) { + override fun onEvent(event: CoreEvent, partition: Int, offset: Long, timestamp: Long) { runBlocking { - when (coreEvent) { - is CreateOrderEvent -> orderManager.handleNewOrder(coreEvent) - is RejectOrderEvent -> orderManager.handleRejectOrder(coreEvent) - is UpdatedOrderEvent -> orderManager.handleUpdateOrder(coreEvent) - is CancelOrderEvent -> orderManager.handleCancelOrder(coreEvent) + when (event) { + is CreateOrderEvent -> orderManager.handleNewOrder(event) + is RejectOrderEvent -> orderManager.handleRejectOrder(event) + is UpdatedOrderEvent -> orderManager.handleUpdateOrder(event) + is CancelOrderEvent -> orderManager.handleCancelOrder(event) else -> { - println("Event is not accepted ${coreEvent::class.java}") + println("Event is not accepted ${event::class.java}") } } } diff --git a/accountant/accountant-app/src/main/kotlin/co/nilin/opex/accountant/app/listener/AccountantTempEventListener.kt b/accountant/accountant-app/src/main/kotlin/co/nilin/opex/accountant/app/listener/AccountantTempEventListener.kt index c5056fcb7..21feb6b90 100644 --- a/accountant/accountant-app/src/main/kotlin/co/nilin/opex/accountant/app/listener/AccountantTempEventListener.kt +++ b/accountant/accountant-app/src/main/kotlin/co/nilin/opex/accountant/app/listener/AccountantTempEventListener.kt @@ -15,17 +15,17 @@ class AccountantTempEventListener( return "TempEventListener" } - override fun onEvent(coreEvent: CoreEvent, partition: Int, offset: Long, timestamp: Long) { - println("TempEvent $coreEvent") + override fun onEvent(event: CoreEvent, partition: Int, offset: Long, timestamp: Long) { + println("TempEvent $event") runBlocking { - when (coreEvent) { - is CreateOrderEvent -> orderManager.handleNewOrder(coreEvent) - is RejectOrderEvent -> orderManager.handleRejectOrder(coreEvent) - is UpdatedOrderEvent -> orderManager.handleUpdateOrder(coreEvent) - is CancelOrderEvent -> orderManager.handleCancelOrder(coreEvent) - is TradeEvent -> tradeManager.handleTrade(coreEvent) + when (event) { + is CreateOrderEvent -> orderManager.handleNewOrder(event) + is RejectOrderEvent -> orderManager.handleRejectOrder(event) + is UpdatedOrderEvent -> orderManager.handleUpdateOrder(event) + is CancelOrderEvent -> orderManager.handleCancelOrder(event) + is TradeEvent -> tradeManager.handleTrade(event) else -> { - throw IllegalArgumentException("Event is not accepted ${coreEvent::class.java}") + throw IllegalArgumentException("Event is not accepted ${event::class.java}") } } } diff --git a/accountant/accountant-app/src/main/kotlin/co/nilin/opex/accountant/app/listener/AccountantTradeListener.kt b/accountant/accountant-app/src/main/kotlin/co/nilin/opex/accountant/app/listener/AccountantTradeListener.kt index 829e6c88d..024f9034a 100644 --- a/accountant/accountant-app/src/main/kotlin/co/nilin/opex/accountant/app/listener/AccountantTradeListener.kt +++ b/accountant/accountant-app/src/main/kotlin/co/nilin/opex/accountant/app/listener/AccountantTradeListener.kt @@ -1,6 +1,7 @@ package co.nilin.opex.accountant.app.listener import co.nilin.opex.accountant.core.api.TradeManager +import co.nilin.opex.accountant.ports.kafka.listener.spi.Listener import co.nilin.opex.accountant.ports.kafka.listener.spi.TradeListener import co.nilin.opex.matching.engine.core.eventh.events.TradeEvent import kotlinx.coroutines.runBlocking @@ -11,9 +12,9 @@ class AccountantTradeListener(private val tradeManager: TradeManager) : TradeLis return "TradeListener" } - override fun onTrade(tradeEvent: TradeEvent, partition: Int, offset: Long, timestamp: Long) { + override fun onEvent(event: TradeEvent, partition: Int, offset: Long, timestamp: Long) { runBlocking { - tradeManager.handleTrade(tradeEvent) + tradeManager.handleTrade(event) } } } \ No newline at end of file diff --git a/accountant/accountant-app/src/main/kotlin/co/nilin/opex/accountant/app/listener/OrderListener.kt b/accountant/accountant-app/src/main/kotlin/co/nilin/opex/accountant/app/listener/OrderListener.kt index ac0286ad4..456cd1a79 100644 --- a/accountant/accountant-app/src/main/kotlin/co/nilin/opex/accountant/app/listener/OrderListener.kt +++ b/accountant/accountant-app/src/main/kotlin/co/nilin/opex/accountant/app/listener/OrderListener.kt @@ -15,22 +15,22 @@ class OrderListener(private val orderManager: OrderManager) : OrderSubmitRequest return "OrderListener" } - override fun onOrder(order: OrderSubmitRequest, partition: Int, offset: Long, timestamp: Long) { + override fun onEvent(event: OrderSubmitRequest, partition: Int, offset: Long, timestamp: Long) { runBlocking { - logger.info("Order submit event received ${order.ouid}") + logger.info("Order submit event received ${event.ouid}") orderManager.handleRequestOrder( SubmitOrderEvent( - order.ouid, - order.uuid, - order.orderId, - order.pair, - order.price, - order.quantity, - order.quantity, - order.direction, - order.matchConstraint, - order.orderType + event.ouid, + event.uuid, + event.orderId, + event.pair, + event.price, + event.quantity, + event.quantity, + event.direction, + event.matchConstraint, + event.orderType ) ) } diff --git a/accountant/accountant-core/pom.xml b/accountant/accountant-core/pom.xml index 022876966..fb20af05c 100644 --- a/accountant/accountant-core/pom.xml +++ b/accountant/accountant-core/pom.xml @@ -50,8 +50,17 @@ provided - org.mockito - mockito-core + org.mockito.kotlin + mockito-kotlin + + + io.mockk + mockk + + + org.jetbrains.kotlinx + kotlinx-coroutines-core + test diff --git a/accountant/accountant-core/src/test/kotlin/co/nilin/opex/accountant/core/service/FAJobManagerImplTest.kt b/accountant/accountant-core/src/test/kotlin/co/nilin/opex/accountant/core/service/FAJobManagerImplTest.kt new file mode 100644 index 000000000..3318eac7d --- /dev/null +++ b/accountant/accountant-core/src/test/kotlin/co/nilin/opex/accountant/core/service/FAJobManagerImplTest.kt @@ -0,0 +1,112 @@ +package co.nilin.opex.accountant.core.service + +import co.nilin.opex.accountant.core.model.FinancialActionStatus +import co.nilin.opex.accountant.core.spi.FinancialActionLoader +import co.nilin.opex.accountant.core.spi.FinancialActionPersister +import co.nilin.opex.accountant.core.spi.WalletProxy +import io.mockk.coEvery +import io.mockk.coVerify +import io.mockk.mockk +import kotlinx.coroutines.runBlocking +import org.junit.jupiter.api.Test + +class FAJobManagerImplTest { + + private val financialActionLoader = mockk() + private val financialActionPersister = mockk() + private val walletProxy = mockk() + + private val sut = FinancialActionJobManagerImpl(financialActionLoader, financialActionPersister, walletProxy) + + init { + coEvery { financialActionLoader.loadUnprocessed(any(), any()) } returns listOf(Valid.fa, Valid.fa) + coEvery { financialActionPersister.updateStatus(any(), any()) } returns Unit + } + + @Test + fun given2FALoaded_whenProcessing_thenVerifyThatTransferProxyCalled2Times() = runBlocking { + coEvery { walletProxy.transfer(any(), any(), any(), any(), any(), any(), any(), any()) } returns Unit + sut.processFinancialActions(0, 2) + with(Valid.fa) { + coVerify(exactly = 2) { + walletProxy.transfer( + eq(symbol), + eq(senderWalletType), + eq(sender), + eq(receiverWalletType), + eq(receiver), + eq(amount), + eq(eventType + pointer), + any() + ) + } + coVerify(exactly = 2) { + financialActionPersister.updateStatus( + eq(this@with), + eq(FinancialActionStatus.PROCESSED) + ) + } + } + } + + @Test + fun given2FALoaded_whenProcessingFailed_thenUpdateStatusCalledRegardless() = runBlocking { + coEvery { + walletProxy.transfer(any(), any(), any(), any(), any(), any(), any(), any()) + } throws IllegalStateException() + + sut.processFinancialActions(0, 2) + with(Valid.fa) { + coVerify(exactly = 2) { + walletProxy.transfer( + eq(symbol), + eq(senderWalletType), + eq(sender), + eq(receiverWalletType), + eq(receiver), + eq(amount), + eq(eventType + pointer), + any() + ) + } + coVerify(exactly = 2) { + financialActionPersister.updateStatus( + eq(this@with), + eq(FinancialActionStatus.CREATED) + ) + } + } + } + + @Test + fun given2FALoaded_whenProcessingFailedAndRetryCountExceeded_thenUpdateStatusCalledRegardless() = runBlocking { + coEvery { + walletProxy.transfer(any(), any(), any(), any(), any(), any(), any(), any()) + } throws IllegalStateException() + + coEvery { financialActionLoader.loadUnprocessed(any(), any()) } returns listOf(Valid.faHighRetry) + + sut.processFinancialActions(0, 1) + with(Valid.faHighRetry) { + coVerify(exactly = 1) { + walletProxy.transfer( + eq(symbol), + eq(senderWalletType), + eq(sender), + eq(receiverWalletType), + eq(receiver), + eq(amount), + eq(eventType + pointer), + any() + ) + } + coVerify(exactly = 1) { + financialActionPersister.updateStatus( + eq(this@with), + eq(FinancialActionStatus.ERROR) + ) + } + } + } + +} \ No newline at end of file diff --git a/accountant/accountant-core/src/test/kotlin/co/nilin/opex/accountant/core/service/FeeCalculatorImplTest.kt b/accountant/accountant-core/src/test/kotlin/co/nilin/opex/accountant/core/service/FeeCalculatorImplTest.kt new file mode 100644 index 000000000..15d1b20dd --- /dev/null +++ b/accountant/accountant-core/src/test/kotlin/co/nilin/opex/accountant/core/service/FeeCalculatorImplTest.kt @@ -0,0 +1,72 @@ +package co.nilin.opex.accountant.core.service + +import co.nilin.opex.accountant.core.model.FinancialAction +import co.nilin.opex.matching.engine.core.eventh.events.TradeEvent +import kotlinx.coroutines.runBlocking +import org.assertj.core.api.Assertions.assertThat +import org.junit.jupiter.api.Test +import java.time.LocalDateTime + +internal class FeeCalculatorImplTest { + + private val receiverAddress = "0x0" + private val feeCalculator = FeeCalculatorImpl(receiverAddress) + + @Test + fun givenTradeEventsAndOrders_whenFeeCalculated_feeActionsNotNull(): Unit = runBlocking { + val actions = + feeCalculator.createFeeActions(Valid.tradeEvent, Valid.makerOrder, Valid.takerOrder, null, null) + assertThat(actions.makerFeeAction).isNotNull + assertThat(actions.takerFeeAction).isNotNull + } + + @Test + fun givenTradeEventsAndOrders_whenFeeCalculated_returnCorrectFees(): Unit = runBlocking { + val actions = + feeCalculator.createFeeActions(Valid.tradeEvent, Valid.makerOrder, Valid.takerOrder, null, null) + with(actions.makerFeeAction) { + assertThat(amount.toDouble()).isEqualTo(0.01) // 1% of 1 BTC + assertThat(symbol).isEqualTo("BTC") + assertThat(sender).isEqualTo("user_1") + assertThat(pointer).isEqualTo("order_2") + assertThat(receiver).isEqualTo(receiverAddress) + assertThat(receiverWalletType).isEqualTo("exchange") + } + + with(actions.takerFeeAction) { + assertThat(amount.toDouble()).isEqualTo(500.0) // 1% of 50,000 USDT + assertThat(symbol).isEqualTo("USDT") + assertThat(sender).isEqualTo("user_2") + assertThat(pointer).isEqualTo("order_1") + assertThat(receiver).isEqualTo(receiverAddress) + assertThat(receiverWalletType).isEqualTo("exchange") + } + } + + @Test + fun givenTradeEventsAndOrders_whenFAParentNotNull_thenFeeActionParentNotNull(): Unit = runBlocking { + val parentFA = FinancialAction( + null, + TradeEvent::class.java.name, + "trade_id", + "BTC_USDT", + 10000.0.toBigDecimal(), + "user_parent", + "main", + "system", + "main", + Valid.currentTime + ) + + val actions = feeCalculator.createFeeActions( + Valid.tradeEvent, + Valid.makerOrder, + Valid.takerOrder, + parentFA, + parentFA + ) + assertThat(actions.makerFeeAction.parent).isNotNull + assertThat(actions.takerFeeAction.parent).isNotNull + } + +} \ No newline at end of file diff --git a/accountant/accountant-core/src/test/kotlin/co/nilin/opex/accountant/core/service/MockitoHelper.kt b/accountant/accountant-core/src/test/kotlin/co/nilin/opex/accountant/core/service/MockitoHelper.kt deleted file mode 100644 index b40a99504..000000000 --- a/accountant/accountant-core/src/test/kotlin/co/nilin/opex/accountant/core/service/MockitoHelper.kt +++ /dev/null @@ -1,13 +0,0 @@ -package co.nilin.opex.accountant.core.service - -import org.mockito.Mockito - -object MockitoHelper { - fun anyObject(): T { - Mockito.any() - return uninitialized() - } - - @Suppress("UNCHECKED_CAST") - fun uninitialized(): T = null as T -} \ No newline at end of file diff --git a/accountant/accountant-core/src/test/kotlin/co/nilin/opex/accountant/core/service/OrderManagerImplTest.kt b/accountant/accountant-core/src/test/kotlin/co/nilin/opex/accountant/core/service/OrderManagerImplTest.kt index c939fbff4..66193ff06 100644 --- a/accountant/accountant-core/src/test/kotlin/co/nilin/opex/accountant/core/service/OrderManagerImplTest.kt +++ b/accountant/accountant-core/src/test/kotlin/co/nilin/opex/accountant/core/service/OrderManagerImplTest.kt @@ -1,194 +1,309 @@ package co.nilin.opex.accountant.core.service -import co.nilin.opex.accountant.core.api.OrderManager +import co.nilin.opex.accountant.core.inout.OrderStatus import co.nilin.opex.accountant.core.model.FinancialAction import co.nilin.opex.accountant.core.model.PairConfig import co.nilin.opex.accountant.core.model.PairFeeConfig import co.nilin.opex.accountant.core.spi.* +import co.nilin.opex.matching.engine.core.eventh.events.CancelOrderEvent +import co.nilin.opex.matching.engine.core.eventh.events.CreateOrderEvent +import co.nilin.opex.matching.engine.core.eventh.events.RejectOrderEvent import co.nilin.opex.matching.engine.core.eventh.events.SubmitOrderEvent +import co.nilin.opex.matching.engine.core.inout.RejectReason +import co.nilin.opex.matching.engine.core.inout.RequestedOperation import co.nilin.opex.matching.engine.core.model.MatchConstraint import co.nilin.opex.matching.engine.core.model.OrderDirection import co.nilin.opex.matching.engine.core.model.OrderType +import co.nilin.opex.matching.engine.core.model.Pair +import io.mockk.coEvery +import io.mockk.coVerify +import io.mockk.mockk import kotlinx.coroutines.runBlocking -import org.junit.jupiter.api.Assertions.assertEquals +import org.assertj.core.api.Assertions.assertThat import org.junit.jupiter.api.Test -import org.mockito.ArgumentMatchers.anyString -import org.mockito.Mock -import org.mockito.Mockito -import org.mockito.MockitoAnnotations +import org.mockito.kotlin.any import java.math.BigDecimal import java.time.LocalDateTime internal class OrderManagerImplTest { - @Mock - lateinit var financialActionPersister: FinancialActionPersister + private val financialActionPersister = mockk() + private val financialActionLoader = mockk() + private val orderPersister = mockk() + private val tempEventPersister = mockk() + private val pairConfigLoader = mockk() + private val richOrderPublisher = mockk() - @Mock - lateinit var financialActionLoader: FinancialActionLoader + private val orderManager = OrderManagerImpl( + pairConfigLoader, + financialActionPersister, + financialActionLoader, + orderPersister, + tempEventPersister, + richOrderPublisher + ) - @Mock - lateinit var orderPersister: OrderPersister + init { + coEvery { tempEventPersister.loadTempEvents(any()) } returns emptyList() + coEvery { orderPersister.save(any()) } returnsArgument (0) + coEvery { richOrderPublisher.publish(any()) } returnsArgument (0) + coEvery { tempEventPersister.saveTempEvent(any(), any()) } returns any() + coEvery { financialActionLoader.findLast(any(), any()) } returns null + coEvery { financialActionPersister.persist(any()) } returnsArgument (0) + } - @Mock - lateinit var tempEventPersister: TempEventPersister + @Test + fun givenAskOrder_whenHandleRequestOrder_thenFAMatch(): Unit = runBlocking { + //given + val pair = Pair("ETH", "BTC") + val pairConfig = PairConfig( + pair.toString(), + pair.leftSideName, + pair.rightSideName, + BigDecimal.valueOf(1.0), + BigDecimal.valueOf(0.001) + ) + val submitOrderEvent = SubmitOrderEvent( + "ouid", "uuid", null, pair, 30, 60, 0, OrderDirection.ASK, MatchConstraint.GTC, OrderType.LIMIT_ORDER + ) - @Mock - lateinit var pairConfigLoader: PairConfigLoader + coEvery { + pairConfigLoader.load(pair.toString(), submitOrderEvent.direction, "") + } returns PairFeeConfig( + pairConfig, + submitOrderEvent.direction.toString(), + "", + BigDecimal.valueOf(0.1), + BigDecimal.valueOf(0.12) + ) - @Mock - lateinit var richOrderPublisher: RichOrderPublisher + coEvery { financialActionPersister.persist(any()) } returnsArgument (0) - private val orderManager: OrderManager + //when + val financialActions = orderManager.handleRequestOrder(submitOrderEvent) - init { - MockitoAnnotations.openMocks(this) - orderManager = OrderManagerImpl( - pairConfigLoader, - financialActionPersister, - financialActionLoader, - orderPersister, - tempEventPersister, - richOrderPublisher + //then + assertThat(financialActions.size).isEqualTo(1) + val expectedFinancialAction = FinancialAction( + null, + SubmitOrderEvent::class.simpleName!!, + submitOrderEvent.ouid, + pair.leftSideName, + pairConfig.leftSideFraction.multiply(submitOrderEvent.quantity.toBigDecimal()), + submitOrderEvent.uuid, + "main", + submitOrderEvent.uuid, + "exchange", + Valid.currentTime ) - runBlocking { - Mockito.`when`(tempEventPersister.loadTempEvents(anyString())).thenReturn(emptyList()) + + with(expectedFinancialAction) { + assertThat(eventType).isEqualTo(financialActions[0].eventType) + assertThat(symbol).isEqualTo(financialActions[0].symbol) + assertThat(amount).isEqualTo(financialActions[0].amount) + assertThat(sender).isEqualTo(financialActions[0].sender) + assertThat(senderWalletType).isEqualTo(financialActions[0].senderWalletType) + assertThat(receiver).isEqualTo(financialActions[0].receiver) + assertThat(receiverWalletType).isEqualTo(financialActions[0].receiverWalletType) } } @Test - fun givenAskOrder_whenHandleRequestOrder_thenFAMatch() { - runBlocking { - //given - val pair = co.nilin.opex.matching.engine.core.model.Pair("eth", "btc") - val pairConfig = PairConfig( - pair.toString(), - pair.leftSideName, - pair.rightSideName, - BigDecimal.valueOf(1.0), - BigDecimal.valueOf(0.001) - ) - val submitOrderEvent = SubmitOrderEvent( - "ouid", "uuid", null, pair, 30, 60, 0, OrderDirection.ASK, MatchConstraint.GTC, OrderType.LIMIT_ORDER - ) - Mockito.`when`(pairConfigLoader.load(pair.toString(), submitOrderEvent.direction, "")) - .thenReturn( - PairFeeConfig( - pairConfig, - submitOrderEvent.direction.toString(), - "", - BigDecimal.valueOf(0.1), - BigDecimal.valueOf(0.12) - ) - ) - Mockito.`when`(financialActionPersister.persist(MockitoHelper.anyObject())) - .then { - return@then it.getArgument>(0) - } - - //when - val financialActions = orderManager.handleRequestOrder(submitOrderEvent) - - //then - assertEquals(1, financialActions.size) - val expectedFinancialAction = FinancialAction( - null, - SubmitOrderEvent::class.simpleName!!, - submitOrderEvent.ouid, - pair.leftSideName, - pairConfig.leftSideFraction.multiply(submitOrderEvent.quantity.toBigDecimal()), - submitOrderEvent.uuid, - "main", - submitOrderEvent.uuid, - "exchange", - LocalDateTime.now() - ) - assertEquals(expectedFinancialAction.eventType, financialActions[0].eventType) - assertEquals(expectedFinancialAction.symbol, financialActions[0].symbol) - assertEquals(expectedFinancialAction.amount, financialActions[0].amount) - assertEquals(expectedFinancialAction.sender, financialActions[0].sender) - assertEquals(expectedFinancialAction.senderWalletType, financialActions[0].senderWalletType) - assertEquals(expectedFinancialAction.receiver, financialActions[0].receiver) - assertEquals(expectedFinancialAction.receiverWalletType, financialActions[0].receiverWalletType) + fun givenBidOrder_whenHandleRequestOrder_thenFAMatch(): Unit = runBlocking { + //given + val pair = Pair("eth", "btc") + val pairConfig = PairConfig( + pair.toString(), + pair.leftSideName, + pair.rightSideName, + BigDecimal.valueOf(1.0), + BigDecimal.valueOf(0.001) + ) + val submitOrderEvent = SubmitOrderEvent( + "ouid", "uuid", null, pair, 35, 14, 0, OrderDirection.BID, MatchConstraint.GTC, OrderType.LIMIT_ORDER + ) + + coEvery { + pairConfigLoader.load(pair.toString(), submitOrderEvent.direction, "") + } returns PairFeeConfig( + pairConfig, + submitOrderEvent.direction.toString(), + "", + BigDecimal.valueOf(0.08), + BigDecimal.valueOf(0.1) + ) + + coEvery { financialActionPersister.persist(any()) } returnsArgument (0) + + //when + val financialActions = orderManager.handleRequestOrder(submitOrderEvent) + + //then + assertThat(financialActions.size).isEqualTo(1) + val expectedFinancialAction = FinancialAction( + null, + SubmitOrderEvent::class.simpleName!!, + submitOrderEvent.ouid, + pair.rightSideName, + pairConfig.leftSideFraction.multiply(submitOrderEvent.quantity.toBigDecimal()) + .multiply(pairConfig.rightSideFraction) + .multiply(submitOrderEvent.price.toBigDecimal()), + submitOrderEvent.uuid, + "main", + submitOrderEvent.uuid, + "exchange", + Valid.currentTime + ) + with(expectedFinancialAction) { + assertThat(eventType).isEqualTo(financialActions[0].eventType) + assertThat(symbol).isEqualTo(financialActions[0].symbol) + assertThat(amount).isEqualTo(financialActions[0].amount) + assertThat(sender).isEqualTo(financialActions[0].sender) + assertThat(senderWalletType).isEqualTo(financialActions[0].senderWalletType) + assertThat(receiver).isEqualTo(financialActions[0].receiver) + assertThat(receiverWalletType).isEqualTo(financialActions[0].receiverWalletType) } } @Test - fun givenBidOrder_whenHandleRequestOrder_thenFAMatch() { - runBlocking { - //given - val pair = co.nilin.opex.matching.engine.core.model.Pair("eth", "btc") - val pairConfig = PairConfig( - pair.toString(), - pair.leftSideName, - pair.rightSideName, - BigDecimal.valueOf(1.0), - BigDecimal.valueOf(0.001) - ) - val submitOrderEvent = SubmitOrderEvent( - "ouid", "uuid", null, pair, 35, 14, 0, OrderDirection.BID, MatchConstraint.GTC, OrderType.LIMIT_ORDER - ) - Mockito.`when`(pairConfigLoader.load(pair.toString(), submitOrderEvent.direction, "")) - .thenReturn( - PairFeeConfig( - pairConfig, - submitOrderEvent.direction.toString(), - "", - BigDecimal.valueOf(0.08), - BigDecimal.valueOf(0.1) - ) - ) - Mockito.`when`(financialActionPersister.persist(MockitoHelper.anyObject())) - .then { - return@then it.getArgument>(0) - } - - //when - val financialActions = runBlocking { - orderManager.handleRequestOrder(submitOrderEvent) - } - - //then - assertEquals(1, financialActions.size) - val expectedFinancialAction = FinancialAction( - null, - SubmitOrderEvent::class.simpleName!!, - submitOrderEvent.ouid, - pair.rightSideName, - pairConfig.leftSideFraction.multiply(submitOrderEvent.quantity.toBigDecimal()) - .multiply(pairConfig.rightSideFraction) - .multiply(submitOrderEvent.price.toBigDecimal()), - submitOrderEvent.uuid, - "main", - submitOrderEvent.uuid, - "exchange", - LocalDateTime.now() - ) - assertEquals(expectedFinancialAction.eventType, financialActions[0].eventType) - assertEquals(expectedFinancialAction.symbol, financialActions[0].symbol) - assertEquals(expectedFinancialAction.amount, financialActions[0].amount) - assertEquals(expectedFinancialAction.sender, financialActions[0].sender) - assertEquals(expectedFinancialAction.senderWalletType, financialActions[0].senderWalletType) - assertEquals(expectedFinancialAction.receiver, financialActions[0].receiver) - assertEquals(expectedFinancialAction.receiverWalletType, financialActions[0].receiverWalletType) - } + fun givenNewOrderEventReceived_whenUpdatingOrder_matchingEngineIdMatch(): Unit = runBlocking { + val orderEvent = CreateOrderEvent( + "order_ouid", + "user_1", + 55, + Pair("BTC", "USDT"), + 100000, + 1000, + 0, + OrderDirection.BID + ) + + coEvery { orderPersister.load(any()) } returns Valid.order + + val fa = orderManager.handleNewOrder(orderEvent) + + assertThat(fa.size).isEqualTo(0) + assertThat(Valid.order.matchingEngineId).isEqualTo(55) + coVerify(exactly = 1) { richOrderPublisher.publish(any()) } + } + + @Test + fun givenNewOrderEventReceived_whenLocalOrderNull_saveTempEvent(): Unit = runBlocking { + val orderEvent = CreateOrderEvent( + "order_ouid", + "user_1", + 55, + Pair("BTC", "USDT"), + 100000, + 1000, + 0, + OrderDirection.BID + ) + + coEvery { orderPersister.load(any()) } returns null + + val fa = orderManager.handleNewOrder(orderEvent) + + assertThat(fa.size).isEqualTo(0) + coVerify(exactly = 1) { tempEventPersister.saveTempEvent(any(), any()) } } @Test - fun handleNewOrder() { + fun givenRejectOrderReceived_whenLocalOrderNull_saveTempEvent(): Unit = runBlocking { + val orderEvent = RejectOrderEvent( + "ouid", + "user_1", + 56, + Pair("BTC", "USDT"), + RequestedOperation.CANCEL_ORDER, + RejectReason.ORDER_NOT_FOUND + ) + + coEvery { orderPersister.load(any()) } returns null + + val fa = orderManager.handleRejectOrder(orderEvent) + + assertThat(fa.size).isEqualTo(0) + coVerify(exactly = 1) { tempEventPersister.saveTempEvent(any(), any()) } } @Test - fun handleUpdateOrder() { + fun givenRejectOrderReceived_whenLocalFound_publishRichOrderUpdate(): Unit = runBlocking { + val orderEvent = RejectOrderEvent( + "ouid", + "user_1", + 56, + Pair("BTC", "USDT"), + 100000, + 1000, + OrderDirection.BID, + MatchConstraint.GTC, + OrderType.LIMIT_ORDER, + RequestedOperation.CANCEL_ORDER, + RejectReason.ORDER_NOT_FOUND, + ) + coEvery { orderPersister.load(any()) } returns Valid.order + + val fa = orderManager.handleRejectOrder(orderEvent)[0] + + assertThat(fa.amount).isEqualTo(Valid.order.remainedTransferAmount) + assertThat(fa.symbol).isEqualTo(orderEvent.pair.rightSideName) + assertThat(Valid.order.status).isEqualTo(OrderStatus.REJECTED.code) + + coVerify(exactly = 1) { richOrderPublisher.publish(any()) } + coVerify(exactly = 1) { orderPersister.save(any()) } } @Test - fun handleRejectOrder() { + fun givenCancelOrderReceived_whenLocalOrderNull_saveTempEvent(): Unit = runBlocking { + val orderEvent = CancelOrderEvent( + "order_ouid", + "user_id", + 88, + Pair("BTC", "USDT"), + 100000, + 1000, + 500, + OrderDirection.BID + ) + + coEvery { orderPersister.load(any()) } returns null + + val fa = orderManager.handleCancelOrder(orderEvent) + + assertThat(fa.size).isEqualTo(0) + coVerify(exactly = 1) { tempEventPersister.saveTempEvent(any(), any()) } + } + + @Test + fun givenCancelOrderReceived_whenLocalFound_publishRichOrderUpdate(): Unit = runBlocking { + val orderEvent = CancelOrderEvent( + "order_ouid", + "user_id", + 88, + Pair("BTC", "USDT"), + 100000, + 1000, + 500, + OrderDirection.BID + ) + coEvery { orderPersister.load(any()) } returns Valid.order + + val fa = orderManager.handleCancelOrder(orderEvent)[0] + + assertThat(fa.amount).isEqualTo(Valid.order.remainedTransferAmount) + assertThat(fa.symbol).isEqualTo(orderEvent.pair.rightSideName) + assertThat(Valid.order.status).isEqualTo(OrderStatus.CANCELED.code) + + coVerify(exactly = 1) { richOrderPublisher.publish(any()) } + coVerify(exactly = 1) { orderPersister.save(any()) } } + //TODO @Test - fun handleCancelOrder() { + fun handleUpdateOrder() { } + } diff --git a/accountant/accountant-core/src/test/kotlin/co/nilin/opex/accountant/core/service/TradeManagerImplTest.kt b/accountant/accountant-core/src/test/kotlin/co/nilin/opex/accountant/core/service/TradeManagerImplTest.kt index add9d374b..34de594b0 100644 --- a/accountant/accountant-core/src/test/kotlin/co/nilin/opex/accountant/core/service/TradeManagerImplTest.kt +++ b/accountant/accountant-core/src/test/kotlin/co/nilin/opex/accountant/core/service/TradeManagerImplTest.kt @@ -1,8 +1,5 @@ package co.nilin.opex.accountant.core.service -import co.nilin.opex.accountant.core.api.OrderManager -import co.nilin.opex.accountant.core.api.TradeManager -import co.nilin.opex.accountant.core.model.FinancialAction import co.nilin.opex.accountant.core.model.Order import co.nilin.opex.accountant.core.model.PairConfig import co.nilin.opex.accountant.core.model.PairFeeConfig @@ -13,175 +10,146 @@ import co.nilin.opex.matching.engine.core.model.MatchConstraint import co.nilin.opex.matching.engine.core.model.OrderDirection import co.nilin.opex.matching.engine.core.model.OrderType import co.nilin.opex.matching.engine.core.model.Pair +import io.mockk.coEvery +import io.mockk.mockk import kotlinx.coroutines.runBlocking -import org.junit.jupiter.api.Assertions +import org.assertj.core.api.Assertions.assertThat import org.junit.jupiter.api.Test -import org.mockito.ArgumentMatchers -import org.mockito.Mock -import org.mockito.Mockito -import org.mockito.MockitoAnnotations import java.math.BigDecimal internal class TradeManagerImplTest { - @Mock - lateinit var financialActionPersister: FinancialActionPersister - - @Mock - lateinit var financeActionLoader: FinancialActionLoader - - @Mock - lateinit var orderPersister: OrderPersister - - @Mock - lateinit var pairConfigLoader: PairConfigLoader - - @Mock - lateinit var tempEventPersister: TempEventPersister - - @Mock - lateinit var tempEventRepublisher: TempEventRepublisher - - @Mock - lateinit var richOrderPublisher: RichOrderPublisher - - @Mock - lateinit var richTradePublisher: RichTradePublisher - - private val orderManager: OrderManager - private val tradeManager: TradeManager + private val financialActionPersister = mockk() + private val financeActionLoader = mockk() + private val orderPersister = mockk() + private val pairConfigLoader = mockk() + private val tempEventPersister = mockk() + private val richOrderPublisher = mockk() + private val richTradePublisher = mockk() + + private val orderManager = OrderManagerImpl( + pairConfigLoader, + financialActionPersister, + financeActionLoader, + orderPersister, + tempEventPersister, + richOrderPublisher + ) + + private val tradeManager = TradeManagerImpl( + financialActionPersister, + financeActionLoader, + orderPersister, + tempEventPersister, + richTradePublisher, + richOrderPublisher, + FeeCalculatorImpl("0x0") + ) init { - MockitoAnnotations.openMocks(this) - - orderManager = OrderManagerImpl( - pairConfigLoader, - financialActionPersister, - financeActionLoader, - orderPersister, - tempEventPersister, - richOrderPublisher - ) - - tradeManager = TradeManagerImpl( - financialActionPersister, - financeActionLoader, - orderPersister, - tempEventPersister, - richTradePublisher, - richOrderPublisher, - FeeCalculatorImpl("0x0") - ) - - runBlocking { - Mockito.`when`(tempEventPersister.loadTempEvents(ArgumentMatchers.anyString())).thenReturn(emptyList()) - } + coEvery { tempEventPersister.loadTempEvents(any()) } returns emptyList() + coEvery { orderPersister.save(any()) } returnsArgument(0) + coEvery { financeActionLoader.findLast(any(),any()) } returns null + coEvery { richOrderPublisher.publish(any()) } returns Unit + coEvery { richTradePublisher.publish(any()) } returns Unit } @Test - fun givenSellOrder_WhenMatchBuyOrderCome_thenFAMatched() { - runBlocking { - //given - val pair = Pair("eth", "btc") - val pairConfig = PairConfig( - pair.toString(), - pair.leftSideName, - pair.rightSideName, - BigDecimal.valueOf(1.0), - BigDecimal.valueOf(0.01) - ) - val makerSubmitOrderEvent = SubmitOrderEvent( - "mouid", - "muuid", - null, - pair, - 60000, - 1, - 0, - OrderDirection.ASK, - MatchConstraint.GTC, - OrderType.LIMIT_ORDER - ) - prepareOrder(pair, pairConfig, makerSubmitOrderEvent, BigDecimal.valueOf(0.1), BigDecimal.valueOf(0.12)) + fun givenSellOrder_WhenMatchBuyOrderCome_thenFAMatched(): Unit = runBlocking { + //given + val pair = Pair("eth", "btc") + val pairConfig = PairConfig( + pair.toString(), + pair.leftSideName, + pair.rightSideName, + BigDecimal.valueOf(1.0), + BigDecimal.valueOf(0.01) + ) + val makerSubmitOrderEvent = SubmitOrderEvent( + "mouid", + "muuid", + null, + pair, + 60000, + 1, + 0, + OrderDirection.ASK, + MatchConstraint.GTC, + OrderType.LIMIT_ORDER + ) + prepareOrder(pair, pairConfig, makerSubmitOrderEvent, BigDecimal.valueOf(0.1), BigDecimal.valueOf(0.12)) - val takerSubmitOrderEvent = SubmitOrderEvent( - "touid", - "tuuid", - null, - pair, - 70000, - 1, - 0, - OrderDirection.BID, - MatchConstraint.GTC, - OrderType.LIMIT_ORDER - ) + val takerSubmitOrderEvent = SubmitOrderEvent( + "touid", + "tuuid", + null, + pair, + 70000, + 1, + 0, + OrderDirection.BID, + MatchConstraint.GTC, + OrderType.LIMIT_ORDER + ) - prepareOrder(pair, pairConfig, takerSubmitOrderEvent, BigDecimal.valueOf(0.08), BigDecimal.valueOf(0.1)) + prepareOrder(pair, pairConfig, takerSubmitOrderEvent, BigDecimal.valueOf(0.08), BigDecimal.valueOf(0.1)) - val tradeEvent = makeTradeEvent(pair, takerSubmitOrderEvent, makerSubmitOrderEvent) - //when - val tradeFinancialActions = tradeManager.handleTrade(tradeEvent) + val tradeEvent = makeTradeEvent(pair, takerSubmitOrderEvent, makerSubmitOrderEvent) + //when + val tradeFinancialActions = tradeManager.handleTrade(tradeEvent) - Assertions.assertEquals(4, tradeFinancialActions.size) - Assertions.assertEquals( - (makerSubmitOrderEvent.price.toBigDecimal() * pairConfig.rightSideFraction).stripTrailingZeros(), - tradeFinancialActions[0].amount.stripTrailingZeros() - ) - } + assertThat(tradeFinancialActions.size).isEqualTo(4) + assertThat((makerSubmitOrderEvent.price.toBigDecimal() * pairConfig.rightSideFraction).stripTrailingZeros()) + .isEqualTo(tradeFinancialActions[0].amount.stripTrailingZeros()) } @Test - fun givenBuyOrder_WhenMatchSellOrderCome_thenFAMatched() { - runBlocking { - //given - val pair = Pair("eth", "btc") - val pairConfig = PairConfig( - pair.toString(), - pair.leftSideName, - pair.rightSideName, - BigDecimal.valueOf(1.0), - BigDecimal.valueOf(0.001) - ) - val makerSubmitOrderEvent = SubmitOrderEvent( - "mouid", - "muuid", - null, - pair, - 70000, - 1, - 0, - OrderDirection.BID, - MatchConstraint.GTC, - OrderType.LIMIT_ORDER - ) - prepareOrder(pair, pairConfig, makerSubmitOrderEvent, BigDecimal.valueOf(0.1), BigDecimal.valueOf(0.12)) + fun givenBuyOrder_WhenMatchSellOrderCome_thenFAMatched(): Unit = runBlocking { + //given + val pair = Pair("eth", "btc") + val pairConfig = PairConfig( + pair.toString(), + pair.leftSideName, + pair.rightSideName, + BigDecimal.valueOf(1.0), + BigDecimal.valueOf(0.001) + ) + val makerSubmitOrderEvent = SubmitOrderEvent( + "mouid", + "muuid", + null, + pair, + 70000, + 1, + 0, + OrderDirection.BID, + MatchConstraint.GTC, + OrderType.LIMIT_ORDER + ) + prepareOrder(pair, pairConfig, makerSubmitOrderEvent, BigDecimal.valueOf(0.1), BigDecimal.valueOf(0.12)) - val takerSubmitOrderEvent = SubmitOrderEvent( - "touid", - "tuuid", - null, - pair, - 60000, - 1, - 0, - OrderDirection.ASK, - MatchConstraint.GTC, - OrderType.LIMIT_ORDER - ) + val takerSubmitOrderEvent = SubmitOrderEvent( + "touid", + "tuuid", + null, + pair, + 60000, + 1, + 0, + OrderDirection.ASK, + MatchConstraint.GTC, + OrderType.LIMIT_ORDER + ) - prepareOrder(pair, pairConfig, takerSubmitOrderEvent, BigDecimal.valueOf(0.08), BigDecimal.valueOf(0.1)) + prepareOrder(pair, pairConfig, takerSubmitOrderEvent, BigDecimal.valueOf(0.08), BigDecimal.valueOf(0.1)) - val tradeEvent = makeTradeEvent(pair, takerSubmitOrderEvent, makerSubmitOrderEvent) - //when - val tradeFinancialActions = tradeManager.handleTrade(tradeEvent) + val tradeEvent = makeTradeEvent(pair, takerSubmitOrderEvent, makerSubmitOrderEvent) + //when + val tradeFinancialActions = tradeManager.handleTrade(tradeEvent) - Assertions.assertEquals(4, tradeFinancialActions.size) - Assertions.assertEquals( - (makerSubmitOrderEvent.price.toBigDecimal() * pairConfig.rightSideFraction).stripTrailingZeros(), - tradeFinancialActions[1].amount.stripTrailingZeros() - ) - } + assertThat(tradeFinancialActions.size).isEqualTo(4) + assertThat((makerSubmitOrderEvent.price.toBigDecimal() * pairConfig.rightSideFraction).stripTrailingZeros()) + .isEqualTo(tradeFinancialActions[1].amount.stripTrailingZeros()) } private fun makeTradeEvent( @@ -208,60 +176,53 @@ internal class TradeManagerImplTest { ) } - private fun prepareOrder( + private suspend fun prepareOrder( pair: Pair, pairConfig: PairConfig, submitOrderEvent: SubmitOrderEvent, makerFee: BigDecimal, takerFee: BigDecimal ) { - runBlocking { - Mockito.`when`(pairConfigLoader.load(pair.toString(), submitOrderEvent.direction, "")) - .thenReturn( - PairFeeConfig( - pairConfig, - submitOrderEvent.direction.toString(), - "", - makerFee, - takerFee - ) - ) - Mockito.`when`(financialActionPersister.persist(MockitoHelper.anyObject())) - .then { - return@then it.getArgument>(0) - } - - val financialActions = orderManager.handleRequestOrder(submitOrderEvent) - - val orderPairFeeConfig = - pairConfigLoader.load(submitOrderEvent.pair.toString(), submitOrderEvent.direction, "") - val orderMakerFee = orderPairFeeConfig.makerFee * BigDecimal.ONE //user level formula - val orderTakerFee = orderPairFeeConfig.takerFee * BigDecimal.ONE //user level formula - Mockito.`when`(orderPersister.load(submitOrderEvent.ouid)).thenReturn( - Order( - submitOrderEvent.pair.toString(), - submitOrderEvent.ouid, - null, - orderMakerFee, - orderTakerFee, - orderPairFeeConfig.pairConfig.leftSideFraction, - orderPairFeeConfig.pairConfig.rightSideFraction, - submitOrderEvent.uuid, - "", - submitOrderEvent.direction, - submitOrderEvent.matchConstraint, - submitOrderEvent.orderType, - submitOrderEvent.price, - submitOrderEvent.quantity, - submitOrderEvent.quantity - submitOrderEvent.remainedQuantity, - submitOrderEvent.price.toBigDecimal(), - submitOrderEvent.quantity.toBigDecimal(), - (submitOrderEvent.quantity - submitOrderEvent.remainedQuantity).toBigDecimal(), - financialActions[0].amount, - financialActions[0].amount, - 0 - ) - ) - } + coEvery { + pairConfigLoader.load(pair.toString(), submitOrderEvent.direction, "") + } returns PairFeeConfig( + pairConfig, + submitOrderEvent.direction.toString(), + "", + makerFee, + takerFee + ) + coEvery { financialActionPersister.persist(any()) } returnsArgument (0) + + val financialActions = orderManager.handleRequestOrder(submitOrderEvent) + + val orderPairFeeConfig = + pairConfigLoader.load(submitOrderEvent.pair.toString(), submitOrderEvent.direction, "") + val orderMakerFee = orderPairFeeConfig.makerFee * BigDecimal.ONE //user level formula + val orderTakerFee = orderPairFeeConfig.takerFee * BigDecimal.ONE //user level formula + + coEvery { orderPersister.load(submitOrderEvent.ouid) } returns Order( + submitOrderEvent.pair.toString(), + submitOrderEvent.ouid, + null, + orderMakerFee, + orderTakerFee, + orderPairFeeConfig.pairConfig.leftSideFraction, + orderPairFeeConfig.pairConfig.rightSideFraction, + submitOrderEvent.uuid, + "", + submitOrderEvent.direction, + submitOrderEvent.matchConstraint, + submitOrderEvent.orderType, + submitOrderEvent.price, + submitOrderEvent.quantity, + submitOrderEvent.quantity - submitOrderEvent.remainedQuantity, + submitOrderEvent.price.toBigDecimal(), + submitOrderEvent.quantity.toBigDecimal(), + (submitOrderEvent.quantity - submitOrderEvent.remainedQuantity).toBigDecimal(), + financialActions[0].amount, + financialActions[0].amount, + 0 + ) } } \ No newline at end of file diff --git a/accountant/accountant-core/src/test/kotlin/co/nilin/opex/accountant/core/service/Valid.kt b/accountant/accountant-core/src/test/kotlin/co/nilin/opex/accountant/core/service/Valid.kt new file mode 100644 index 000000000..d54b99029 --- /dev/null +++ b/accountant/accountant-core/src/test/kotlin/co/nilin/opex/accountant/core/service/Valid.kt @@ -0,0 +1,135 @@ +package co.nilin.opex.accountant.core.service + +import co.nilin.opex.accountant.core.inout.OrderStatus +import co.nilin.opex.accountant.core.model.FinancialAction +import co.nilin.opex.accountant.core.model.Order +import co.nilin.opex.matching.engine.core.eventh.events.TradeEvent +import co.nilin.opex.matching.engine.core.model.MatchConstraint +import co.nilin.opex.matching.engine.core.model.OrderDirection +import co.nilin.opex.matching.engine.core.model.OrderType +import co.nilin.opex.matching.engine.core.model.Pair +import java.math.BigDecimal +import java.time.LocalDateTime + +object Valid { + + val currentTime = LocalDateTime.now() + + val fa = FinancialAction( + null, + TradeEvent::class.java.name, + "trade_id", + "BTC_USDT", + 10000.0.toBigDecimal(), + "user_parent", + "main", + "system", + "main", + currentTime + ) + + val faHighRetry = FinancialAction( + null, + TradeEvent::class.java.name, + "trade_id", + "BTC_USDT", + 10000.0.toBigDecimal(), + "user_parent", + "main", + "system", + "main", + currentTime, + 15 + ) + + var makerOrder = Order( + "BTC_USDT", + "order_1", + 1, + 0.01.toBigDecimal(), + 0.01.toBigDecimal(), + 0.000001.toBigDecimal(), + 0.01.toBigDecimal(), + "user_1", + "*", + OrderDirection.BID, + MatchConstraint.GTC, + OrderType.LIMIT_ORDER, + 50_000.toBigDecimal().divide(0.01.toBigDecimal()).longValueExact(), + 1.toBigDecimal().divide(0.000001.toBigDecimal()).longValueExact(), + 1.toBigDecimal().divide(0.000001.toBigDecimal()).longValueExact(), + 50_000.toBigDecimal(), + 1.toBigDecimal(), + BigDecimal.ZERO, + BigDecimal.ZERO, + BigDecimal.ZERO, + OrderStatus.FILLED.code + ) + + var takerOrder = Order( + "BTC_USDT", + "order_2", + 2, + 0.01.toBigDecimal(), + 0.01.toBigDecimal(), + 0.000001.toBigDecimal(), + 0.01.toBigDecimal(), + "user_2", + "*", + OrderDirection.ASK, + MatchConstraint.GTC, + OrderType.LIMIT_ORDER, + 50_000.toBigDecimal().divide(0.01.toBigDecimal()).longValueExact(), + 1.toBigDecimal().divide(0.000001.toBigDecimal()).longValueExact(), + 1.toBigDecimal().divide(0.000001.toBigDecimal()).longValueExact(), + 50_000.toBigDecimal(), + 1.toBigDecimal(), + BigDecimal.ZERO, + BigDecimal.ZERO, + BigDecimal.ZERO, + OrderStatus.FILLED.code + ) + + var tradeEvent = TradeEvent( + 1, + Pair("BTC", "USDT"), + "order_2", + "user_2", + 2, + OrderDirection.ASK, + (50_000 / 0.01).toLong(), + 0, + "order_1", + "user_1", + 1, + OrderDirection.BID, + (50_000 / 0.01).toLong(), + 0, + (1 / 0.000001).toLong() + ) + + val order = Order( + "BTC_USDT", + "order_ouid", + null, + 0.01.toBigDecimal(), + 0.01.toBigDecimal(), + 0.000001.toBigDecimal(), + 0.01.toBigDecimal(), + "user_1", + "*", + OrderDirection.BID, + MatchConstraint.GTC, + OrderType.LIMIT_ORDER, + 100000, + 1000, + 0, + BigDecimal.ZERO, + BigDecimal.ZERO, + BigDecimal.ZERO, + BigDecimal.ZERO, + 100000.0.toBigDecimal(), + OrderStatus.NEW.code + ) + +} \ No newline at end of file diff --git a/accountant/accountant-ports/accountant-eventlistener-kafka/pom.xml b/accountant/accountant-ports/accountant-eventlistener-kafka/pom.xml index 53582bc51..8bb43937e 100644 --- a/accountant/accountant-ports/accountant-eventlistener-kafka/pom.xml +++ b/accountant/accountant-ports/accountant-eventlistener-kafka/pom.xml @@ -57,5 +57,9 @@ spring-kafka-test test + + io.mockk + mockk + diff --git a/accountant/accountant-ports/accountant-eventlistener-kafka/src/main/kotlin/co/nilin/opex/accountant/ports/kafka/listener/consumer/EventConsumer.kt b/accountant/accountant-ports/accountant-eventlistener-kafka/src/main/kotlin/co/nilin/opex/accountant/ports/kafka/listener/consumer/EventConsumer.kt new file mode 100644 index 000000000..68ba4eec8 --- /dev/null +++ b/accountant/accountant-ports/accountant-eventlistener-kafka/src/main/kotlin/co/nilin/opex/accountant/ports/kafka/listener/consumer/EventConsumer.kt @@ -0,0 +1,31 @@ +package co.nilin.opex.accountant.ports.kafka.listener.consumer + +import co.nilin.opex.accountant.ports.kafka.listener.spi.Listener +import org.apache.kafka.clients.consumer.ConsumerRecord +import org.springframework.kafka.listener.MessageListener + +abstract class EventConsumer, K, V> : MessageListener { + + protected val listeners = arrayListOf() + + override fun onMessage(data: ConsumerRecord) { + listeners.forEach { it.onEvent(data.value(), data.partition(), data.offset(), data.timestamp()) } + } + + fun getListener(id: String): L? { + return listeners.find { it.id() == id } + } + + fun countListeners(): Int { + return listeners.size + } + + fun addListener(listener: L) { + listeners.add(listener) + } + + fun removeListener(listener: L) { + listeners.removeIf { it.id() == listener.id() } + } + +} \ No newline at end of file diff --git a/accountant/accountant-ports/accountant-eventlistener-kafka/src/main/kotlin/co/nilin/opex/accountant/ports/kafka/listener/consumer/EventKafkaListener.kt b/accountant/accountant-ports/accountant-eventlistener-kafka/src/main/kotlin/co/nilin/opex/accountant/ports/kafka/listener/consumer/EventKafkaListener.kt index 3c8a58bb3..02dc816aa 100644 --- a/accountant/accountant-ports/accountant-eventlistener-kafka/src/main/kotlin/co/nilin/opex/accountant/ports/kafka/listener/consumer/EventKafkaListener.kt +++ b/accountant/accountant-ports/accountant-eventlistener-kafka/src/main/kotlin/co/nilin/opex/accountant/ports/kafka/listener/consumer/EventKafkaListener.kt @@ -2,28 +2,7 @@ package co.nilin.opex.accountant.ports.kafka.listener.consumer import co.nilin.opex.accountant.ports.kafka.listener.spi.EventListener import co.nilin.opex.matching.engine.core.eventh.events.CoreEvent -import org.apache.kafka.clients.consumer.ConsumerRecord -import org.springframework.kafka.listener.MessageListener import org.springframework.stereotype.Component @Component -class EventKafkaListener : MessageListener { - - val eventListeners = arrayListOf() - - override fun onMessage(data: ConsumerRecord) { - eventListeners.forEach { tl -> - tl.onEvent(data.value(), data.partition(), data.offset(), data.timestamp()) - } - } - - fun addEventListener(tl: EventListener) { - eventListeners.add(tl) - } - - fun removeEventListener(tl: EventListener) { - eventListeners.removeIf { item -> - item.id() == tl.id() - } - } -} \ No newline at end of file +class EventKafkaListener : EventConsumer() \ No newline at end of file diff --git a/accountant/accountant-ports/accountant-eventlistener-kafka/src/main/kotlin/co/nilin/opex/accountant/ports/kafka/listener/consumer/OrderKafkaListener.kt b/accountant/accountant-ports/accountant-eventlistener-kafka/src/main/kotlin/co/nilin/opex/accountant/ports/kafka/listener/consumer/OrderKafkaListener.kt index 1a6edd9db..08b3d7df5 100644 --- a/accountant/accountant-ports/accountant-eventlistener-kafka/src/main/kotlin/co/nilin/opex/accountant/ports/kafka/listener/consumer/OrderKafkaListener.kt +++ b/accountant/accountant-ports/accountant-eventlistener-kafka/src/main/kotlin/co/nilin/opex/accountant/ports/kafka/listener/consumer/OrderKafkaListener.kt @@ -2,28 +2,7 @@ package co.nilin.opex.accountant.ports.kafka.listener.consumer import co.nilin.opex.accountant.ports.kafka.listener.inout.OrderSubmitRequest import co.nilin.opex.accountant.ports.kafka.listener.spi.OrderSubmitRequestListener -import org.apache.kafka.clients.consumer.ConsumerRecord -import org.springframework.kafka.listener.MessageListener import org.springframework.stereotype.Component @Component -class OrderKafkaListener : MessageListener { - - val orderListeners = arrayListOf() - - override fun onMessage(data: ConsumerRecord) { - orderListeners.forEach { tl -> - tl.onOrder(data.value(), data.partition(), data.offset(), data.timestamp()) - } - } - - fun addOrderListener(tl: OrderSubmitRequestListener) { - orderListeners.add(tl) - } - - fun removeOrderListener(tl: OrderSubmitRequestListener) { - orderListeners.removeIf { item -> - item.id() == tl.id() - } - } -} \ No newline at end of file +class OrderKafkaListener : EventConsumer() \ No newline at end of file diff --git a/accountant/accountant-ports/accountant-eventlistener-kafka/src/main/kotlin/co/nilin/opex/accountant/ports/kafka/listener/consumer/TempEventKafkaListener.kt b/accountant/accountant-ports/accountant-eventlistener-kafka/src/main/kotlin/co/nilin/opex/accountant/ports/kafka/listener/consumer/TempEventKafkaListener.kt index 485f99a2b..6fb848219 100644 --- a/accountant/accountant-ports/accountant-eventlistener-kafka/src/main/kotlin/co/nilin/opex/accountant/ports/kafka/listener/consumer/TempEventKafkaListener.kt +++ b/accountant/accountant-ports/accountant-eventlistener-kafka/src/main/kotlin/co/nilin/opex/accountant/ports/kafka/listener/consumer/TempEventKafkaListener.kt @@ -1,31 +1,8 @@ package co.nilin.opex.accountant.ports.kafka.listener.consumer - import co.nilin.opex.accountant.ports.kafka.listener.spi.TempEventListener import co.nilin.opex.matching.engine.core.eventh.events.CoreEvent -import org.apache.kafka.clients.consumer.ConsumerRecord -import org.springframework.kafka.listener.MessageListener import org.springframework.stereotype.Component @Component -class TempEventKafkaListener : MessageListener { - - val eventListeners = arrayListOf() - - override fun onMessage(data: ConsumerRecord) { - println("TempEvent onMessage") - eventListeners.forEach { tl -> - tl.onEvent(data.value(), data.partition(), data.offset(), data.timestamp()) - } - } - - fun addEventListener(tl: TempEventListener) { - eventListeners.add(tl) - } - - fun removeEventListener(tl: TempEventListener) { - eventListeners.removeIf { item -> - item.id() == tl.id() - } - } -} \ No newline at end of file +class TempEventKafkaListener : EventConsumer() \ No newline at end of file diff --git a/accountant/accountant-ports/accountant-eventlistener-kafka/src/main/kotlin/co/nilin/opex/accountant/ports/kafka/listener/consumer/TradeKafkaListener.kt b/accountant/accountant-ports/accountant-eventlistener-kafka/src/main/kotlin/co/nilin/opex/accountant/ports/kafka/listener/consumer/TradeKafkaListener.kt index 4f10f4b17..a048a6983 100644 --- a/accountant/accountant-ports/accountant-eventlistener-kafka/src/main/kotlin/co/nilin/opex/accountant/ports/kafka/listener/consumer/TradeKafkaListener.kt +++ b/accountant/accountant-ports/accountant-eventlistener-kafka/src/main/kotlin/co/nilin/opex/accountant/ports/kafka/listener/consumer/TradeKafkaListener.kt @@ -2,28 +2,7 @@ package co.nilin.opex.accountant.ports.kafka.listener.consumer import co.nilin.opex.accountant.ports.kafka.listener.spi.TradeListener import co.nilin.opex.matching.engine.core.eventh.events.TradeEvent -import org.apache.kafka.clients.consumer.ConsumerRecord -import org.springframework.kafka.listener.MessageListener import org.springframework.stereotype.Component @Component -class TradeKafkaListener : MessageListener { - - val tradeListeners = arrayListOf() - - override fun onMessage(data: ConsumerRecord) { - tradeListeners.forEach { tl -> - tl.onTrade(data.value(), data.partition(), data.offset(), data.timestamp()) - } - } - - fun addTradeListener(tl: TradeListener) { - tradeListeners.add(tl) - } - - fun removeTradeListener(tl: TradeListener) { - tradeListeners.removeIf { item -> - item.id() == tl.id() - } - } -} \ No newline at end of file +class TradeKafkaListener : EventConsumer() \ No newline at end of file diff --git a/accountant/accountant-ports/accountant-eventlistener-kafka/src/main/kotlin/co/nilin/opex/accountant/ports/kafka/listener/spi/EventListener.kt b/accountant/accountant-ports/accountant-eventlistener-kafka/src/main/kotlin/co/nilin/opex/accountant/ports/kafka/listener/spi/EventListener.kt index 50304b05d..8c2f020dc 100644 --- a/accountant/accountant-ports/accountant-eventlistener-kafka/src/main/kotlin/co/nilin/opex/accountant/ports/kafka/listener/spi/EventListener.kt +++ b/accountant/accountant-ports/accountant-eventlistener-kafka/src/main/kotlin/co/nilin/opex/accountant/ports/kafka/listener/spi/EventListener.kt @@ -2,7 +2,4 @@ package co.nilin.opex.accountant.ports.kafka.listener.spi import co.nilin.opex.matching.engine.core.eventh.events.CoreEvent -interface EventListener { - fun id(): String - fun onEvent(coreEvent: CoreEvent, partition: Int, offset: Long, timestamp: Long) -} \ No newline at end of file +interface EventListener : Listener \ No newline at end of file diff --git a/accountant/accountant-ports/accountant-eventlistener-kafka/src/main/kotlin/co/nilin/opex/accountant/ports/kafka/listener/spi/Listener.kt b/accountant/accountant-ports/accountant-eventlistener-kafka/src/main/kotlin/co/nilin/opex/accountant/ports/kafka/listener/spi/Listener.kt new file mode 100644 index 000000000..a5886d313 --- /dev/null +++ b/accountant/accountant-ports/accountant-eventlistener-kafka/src/main/kotlin/co/nilin/opex/accountant/ports/kafka/listener/spi/Listener.kt @@ -0,0 +1,9 @@ +package co.nilin.opex.accountant.ports.kafka.listener.spi + +interface Listener { + + fun id(): String + + fun onEvent(event: T, partition: Int, offset: Long, timestamp: Long) + +} \ No newline at end of file diff --git a/accountant/accountant-ports/accountant-eventlistener-kafka/src/main/kotlin/co/nilin/opex/accountant/ports/kafka/listener/spi/OrderSubmitRequestListener.kt b/accountant/accountant-ports/accountant-eventlistener-kafka/src/main/kotlin/co/nilin/opex/accountant/ports/kafka/listener/spi/OrderSubmitRequestListener.kt index 71756af2c..ac9a24904 100644 --- a/accountant/accountant-ports/accountant-eventlistener-kafka/src/main/kotlin/co/nilin/opex/accountant/ports/kafka/listener/spi/OrderSubmitRequestListener.kt +++ b/accountant/accountant-ports/accountant-eventlistener-kafka/src/main/kotlin/co/nilin/opex/accountant/ports/kafka/listener/spi/OrderSubmitRequestListener.kt @@ -2,7 +2,4 @@ package co.nilin.opex.accountant.ports.kafka.listener.spi import co.nilin.opex.accountant.ports.kafka.listener.inout.OrderSubmitRequest -interface OrderSubmitRequestListener { - fun id(): String - fun onOrder(order: OrderSubmitRequest, partition: Int, offset: Long, timestamp: Long) -} \ No newline at end of file +interface OrderSubmitRequestListener : Listener \ No newline at end of file diff --git a/accountant/accountant-ports/accountant-eventlistener-kafka/src/main/kotlin/co/nilin/opex/accountant/ports/kafka/listener/spi/TempEventListener.kt b/accountant/accountant-ports/accountant-eventlistener-kafka/src/main/kotlin/co/nilin/opex/accountant/ports/kafka/listener/spi/TempEventListener.kt index 6d069c573..479df1a3b 100644 --- a/accountant/accountant-ports/accountant-eventlistener-kafka/src/main/kotlin/co/nilin/opex/accountant/ports/kafka/listener/spi/TempEventListener.kt +++ b/accountant/accountant-ports/accountant-eventlistener-kafka/src/main/kotlin/co/nilin/opex/accountant/ports/kafka/listener/spi/TempEventListener.kt @@ -2,7 +2,4 @@ package co.nilin.opex.accountant.ports.kafka.listener.spi import co.nilin.opex.matching.engine.core.eventh.events.CoreEvent -interface TempEventListener { - fun id(): String - fun onEvent(coreEvent: CoreEvent, partition: Int, offset: Long, timestamp: Long) -} \ No newline at end of file +interface TempEventListener : Listener \ No newline at end of file diff --git a/accountant/accountant-ports/accountant-eventlistener-kafka/src/main/kotlin/co/nilin/opex/accountant/ports/kafka/listener/spi/TradeListener.kt b/accountant/accountant-ports/accountant-eventlistener-kafka/src/main/kotlin/co/nilin/opex/accountant/ports/kafka/listener/spi/TradeListener.kt index 1d277a953..46f14a0d8 100644 --- a/accountant/accountant-ports/accountant-eventlistener-kafka/src/main/kotlin/co/nilin/opex/accountant/ports/kafka/listener/spi/TradeListener.kt +++ b/accountant/accountant-ports/accountant-eventlistener-kafka/src/main/kotlin/co/nilin/opex/accountant/ports/kafka/listener/spi/TradeListener.kt @@ -2,7 +2,4 @@ package co.nilin.opex.accountant.ports.kafka.listener.spi import co.nilin.opex.matching.engine.core.eventh.events.TradeEvent -interface TradeListener { - fun id(): String - fun onTrade(tradeEvent: TradeEvent, partition: Int, offset: Long, timestamp: Long) -} \ No newline at end of file +interface TradeListener : Listener \ No newline at end of file diff --git a/accountant/accountant-ports/accountant-eventlistener-kafka/src/test/kotlin/co/nilin/opex/accountant/ports/kafka/listener/ConsumerTest.kt b/accountant/accountant-ports/accountant-eventlistener-kafka/src/test/kotlin/co/nilin/opex/accountant/ports/kafka/listener/ConsumerTest.kt new file mode 100644 index 000000000..ea5b998ab --- /dev/null +++ b/accountant/accountant-ports/accountant-eventlistener-kafka/src/test/kotlin/co/nilin/opex/accountant/ports/kafka/listener/ConsumerTest.kt @@ -0,0 +1,52 @@ +package co.nilin.opex.accountant.ports.kafka.listener + +import co.nilin.opex.accountant.ports.kafka.listener.consumer.ConsumerObject +import co.nilin.opex.accountant.ports.kafka.listener.consumer.ListenerObject +import io.mockk.coEvery +import io.mockk.coVerify +import io.mockk.mockk +import org.apache.kafka.clients.consumer.ConsumerRecord +import org.assertj.core.api.Assertions.assertThat +import org.junit.jupiter.api.Test + +class ConsumerTest { + + private val consumer = ConsumerObject() + private val listener = mockk() + + init { + coEvery { listener.onEvent(any(), any(), any(), any()) } returns Unit + } + + @Test + fun givenEventConsumer_onMessage_callListener() { + consumer.addListener(listener) + consumer.onMessage(ConsumerRecord("topic", 1, 0, null, "value")) + + coVerify(exactly = consumer.countListeners()) { listener.onEvent(eq("value"), eq(1), eq(0), any()) } + } + + @Test + fun givenEventConsumer_onMessageWith2Listeners_callListener() { + consumer.addListener(listener) + consumer.addListener(listener) + consumer.onMessage(ConsumerRecord("topic", 1, 0, null, "value")) + + coVerify(exactly = 2) { listener.onEvent(eq("value"), eq(1), eq(0), any()) } + } + + @Test + fun givenEventConsumer_whenAdding1Listener_listenerCountIs1() { + consumer.addListener(listener) + assertThat(consumer.countListeners()).isEqualTo(1) + } + + @Test + fun givenEventConsumer_whenAdding1ListenerAndRemoving1_listenerCountIs0() { + consumer.addListener(listener) + coEvery { listener.id() } returns "L1" + consumer.removeListener(listener) + assertThat(consumer.countListeners()).isEqualTo(0) + } + +} \ No newline at end of file diff --git a/accountant/accountant-ports/accountant-eventlistener-kafka/src/test/kotlin/co/nilin/opex/accountant/ports/kafka/listener/consumer/ConsumerObject.kt b/accountant/accountant-ports/accountant-eventlistener-kafka/src/test/kotlin/co/nilin/opex/accountant/ports/kafka/listener/consumer/ConsumerObject.kt new file mode 100644 index 000000000..6c9911c98 --- /dev/null +++ b/accountant/accountant-ports/accountant-eventlistener-kafka/src/test/kotlin/co/nilin/opex/accountant/ports/kafka/listener/consumer/ConsumerObject.kt @@ -0,0 +1,3 @@ +package co.nilin.opex.accountant.ports.kafka.listener.consumer + +class ConsumerObject : EventConsumer() \ No newline at end of file diff --git a/accountant/accountant-ports/accountant-eventlistener-kafka/src/test/kotlin/co/nilin/opex/accountant/ports/kafka/listener/consumer/ListenerObject.kt b/accountant/accountant-ports/accountant-eventlistener-kafka/src/test/kotlin/co/nilin/opex/accountant/ports/kafka/listener/consumer/ListenerObject.kt new file mode 100644 index 000000000..d99cb8469 --- /dev/null +++ b/accountant/accountant-ports/accountant-eventlistener-kafka/src/test/kotlin/co/nilin/opex/accountant/ports/kafka/listener/consumer/ListenerObject.kt @@ -0,0 +1,17 @@ +package co.nilin.opex.accountant.ports.kafka.listener.consumer + +import co.nilin.opex.accountant.ports.kafka.listener.spi.Listener +import org.slf4j.LoggerFactory + +class ListenerObject : Listener { + + private val logger = LoggerFactory.getLogger(ListenerObject::class.java) + + override fun id(): String { + return "AnyListener" + } + + override fun onEvent(event: Any, partition: Int, offset: Long, timestamp: Long) { + logger.info("event called") + } +} \ No newline at end of file diff --git a/accountant/accountant-ports/accountant-persister-postgres/src/main/kotlin/co/nilin/opex/accountant/ports/postgres/dao/FinancialActionRepository.kt b/accountant/accountant-ports/accountant-persister-postgres/src/main/kotlin/co/nilin/opex/accountant/ports/postgres/dao/FinancialActionRepository.kt index 92b5a6138..1090e0859 100644 --- a/accountant/accountant-ports/accountant-persister-postgres/src/main/kotlin/co/nilin/opex/accountant/ports/postgres/dao/FinancialActionRepository.kt +++ b/accountant/accountant-ports/accountant-persister-postgres/src/main/kotlin/co/nilin/opex/accountant/ports/postgres/dao/FinancialActionRepository.kt @@ -16,7 +16,9 @@ interface FinancialActionRepository : ReactiveCrudRepository @Query("select count(1) from fi_actions fi where fi.sender = :uuid and fi.symbol = :symbol and fi.event_type = :eventType and fi.status = :status") @@ -29,4 +31,10 @@ interface FinancialActionRepository : ReactiveCrudRepository + + @Query("update fi_actions set status = :status where id = :id") + fun updateStatus(@Param("id") id: Long, @Param("status") status: FinancialActionStatus) + + @Query("update fi_actions set status = :status, retry_count = retry_count + 1 where id = :id") + fun updateStatusAndIncreaseRetry(@Param("id") id: Long, @Param("status") status: FinancialActionStatus): Mono } \ No newline at end of file diff --git a/accountant/accountant-ports/accountant-persister-postgres/src/main/kotlin/co/nilin/opex/accountant/ports/postgres/impl/FinancialActionLoaderImpl.kt b/accountant/accountant-ports/accountant-persister-postgres/src/main/kotlin/co/nilin/opex/accountant/ports/postgres/impl/FinancialActionLoaderImpl.kt index cc0e9cd63..643a6f80e 100644 --- a/accountant/accountant-ports/accountant-persister-postgres/src/main/kotlin/co/nilin/opex/accountant/ports/postgres/impl/FinancialActionLoaderImpl.kt +++ b/accountant/accountant-ports/accountant-persister-postgres/src/main/kotlin/co/nilin/opex/accountant/ports/postgres/impl/FinancialActionLoaderImpl.kt @@ -21,17 +21,24 @@ class FinancialActionLoaderImpl(val financialActionRepository: FinancialActionRe return financialActionRepository.findByStatus( FinancialActionStatus.CREATED.name, PageRequest.of(offset.toInt(), size.toInt(), Sort.by(Sort.Direction.ASC, "createDate")) - ).map { fim -> - loadFinancialAction(fim.id)!! - }.toList() + ).map { loadFinancialAction(it.id)!! } + .toList() } override suspend fun findLast(uuid: String, ouid: String): FinancialAction? { return financialActionRepository.findByOuidAndUuid( ouid, uuid, PageRequest.of(0, 1, Sort.by(Sort.Direction.DESC, "createDate")) - ).map { fim -> - loadFinancialAction(fim.id) - }.firstOrNull() + ).map { loadFinancialAction(it.id) } + .firstOrNull() + } + + override suspend fun countUnprocessed(uuid: String, symbol: String, eventType: String): Long { + return financialActionRepository.findByUuidAndSymbolAndEventTypeAndStatus( + uuid, + symbol, + eventType, + FinancialActionStatus.CREATED + ).awaitFirstOrElse { BigDecimal.ZERO }.toLong() } private suspend fun loadFinancialAction(id: Long?): FinancialAction? { @@ -54,14 +61,4 @@ class FinancialActionLoaderImpl(val financialActionRepository: FinancialActionRe } return null } - - override suspend fun countUnprocessed(uuid: String, symbol: String, eventType: String): Long { - return financialActionRepository.findByUuidAndSymbolAndEventTypeAndStatus( - uuid, - symbol, - eventType, - FinancialActionStatus.CREATED - ).awaitFirstOrElse { BigDecimal.ZERO } - .toLong() - } } \ No newline at end of file diff --git a/accountant/accountant-ports/accountant-persister-postgres/src/main/kotlin/co/nilin/opex/accountant/ports/postgres/impl/FinancialActionPersisterImpl.kt b/accountant/accountant-ports/accountant-persister-postgres/src/main/kotlin/co/nilin/opex/accountant/ports/postgres/impl/FinancialActionPersisterImpl.kt index 4f5fb4528..fb82e954d 100644 --- a/accountant/accountant-ports/accountant-persister-postgres/src/main/kotlin/co/nilin/opex/accountant/ports/postgres/impl/FinancialActionPersisterImpl.kt +++ b/accountant/accountant-ports/accountant-persister-postgres/src/main/kotlin/co/nilin/opex/accountant/ports/postgres/impl/FinancialActionPersisterImpl.kt @@ -5,14 +5,12 @@ import co.nilin.opex.accountant.core.model.FinancialActionStatus import co.nilin.opex.accountant.core.spi.FinancialActionPersister import co.nilin.opex.accountant.ports.postgres.dao.FinancialActionRepository import co.nilin.opex.accountant.ports.postgres.model.FinancialActionModel -import kotlinx.coroutines.reactive.awaitFirst -import kotlinx.coroutines.reactive.awaitFirstOrElse -import kotlinx.coroutines.reactive.awaitLast +import kotlinx.coroutines.reactive.awaitFirstOrNull +import kotlinx.coroutines.reactor.awaitSingle import org.springframework.stereotype.Component -import java.time.LocalDateTime @Component -class FinancialActionPersisterImpl(val financialActionRepository: FinancialActionRepository) : +class FinancialActionPersisterImpl(private val financialActionRepository: FinancialActionRepository) : FinancialActionPersister { override suspend fun persist(financialActions: List): List { @@ -32,33 +30,11 @@ class FinancialActionPersisterImpl(val financialActionRepository: FinancialActio "", it.createDate ) - }).awaitLast() + }).collectList().awaitSingle() return financialActions } override suspend fun updateStatus(financialAction: FinancialAction, status: FinancialActionStatus) { - val existing = financialActionRepository.findById(financialAction.id!!).awaitFirstOrElse { - throw IllegalArgumentException() - } - financialActionRepository.save( - FinancialActionModel( - existing.id, - existing.parentId, - existing.eventType, - existing.pointer, - existing.symbol, - existing.amount, - existing.sender, - existing.senderWalletType, - existing.receiver, - existing.receiverWalletType, - existing.agent, - existing.ip, - existing.createDate, - status, - 1 + existing.retryCount, - LocalDateTime.now() - ) - ).awaitFirst() + financialActionRepository.updateStatusAndIncreaseRetry(financialAction.id!!, status).awaitFirstOrNull() } } \ No newline at end of file diff --git a/accountant/accountant-ports/accountant-persister-postgres/src/main/kotlin/co/nilin/opex/accountant/ports/postgres/impl/OrderPersisterImpl.kt b/accountant/accountant-ports/accountant-persister-postgres/src/main/kotlin/co/nilin/opex/accountant/ports/postgres/impl/OrderPersisterImpl.kt index 3f6b177fe..7d70b9a9b 100644 --- a/accountant/accountant-ports/accountant-persister-postgres/src/main/kotlin/co/nilin/opex/accountant/ports/postgres/impl/OrderPersisterImpl.kt +++ b/accountant/accountant-ports/accountant-persister-postgres/src/main/kotlin/co/nilin/opex/accountant/ports/postgres/impl/OrderPersisterImpl.kt @@ -9,7 +9,7 @@ import org.springframework.stereotype.Component import java.time.LocalDateTime @Component -class OrderPersisterImpl(val orderRepository: OrderRepository) : OrderPersister { +class OrderPersisterImpl(private val orderRepository: OrderRepository) : OrderPersister { override suspend fun load(ouid: String): Order? { val model = orderRepository.findByOuid(ouid).awaitFirstOrNull() ?: return null diff --git a/accountant/accountant-ports/accountant-persister-postgres/src/main/kotlin/co/nilin/opex/accountant/ports/postgres/impl/PairConfigLoaderImpl.kt b/accountant/accountant-ports/accountant-persister-postgres/src/main/kotlin/co/nilin/opex/accountant/ports/postgres/impl/PairConfigLoaderImpl.kt index 9474c5559..89443d0e0 100644 --- a/accountant/accountant-ports/accountant-persister-postgres/src/main/kotlin/co/nilin/opex/accountant/ports/postgres/impl/PairConfigLoaderImpl.kt +++ b/accountant/accountant-ports/accountant-persister-postgres/src/main/kotlin/co/nilin/opex/accountant/ports/postgres/impl/PairConfigLoaderImpl.kt @@ -39,11 +39,11 @@ class PairConfigLoaderImpl( } override suspend fun load(pair: String, direction: OrderDirection, userLevel: String): PairFeeConfig { - val pairConfig = pairConfigRepository - .findById(pair).awaitFirstOrElse { - val error = OpexError.InvalidPair - throw OpexException(error, String.format(error.message!!, pair)) - } + val pairConfig = pairConfigRepository.findById(pair).awaitFirstOrElse { + val error = OpexError.InvalidPair + throw OpexException(error, String.format(error.message!!, pair)) + } + var pairFeeConfig: PairFeeConfigModel? if (userLevel.isEmpty()) { pairFeeConfig = pairFeeConfigRepository @@ -73,7 +73,11 @@ class PairConfigLoaderImpl( pairConfig.rightSideWalletSymbol, pairConfig.leftSideFraction, pairConfig.rightSideFraction - ), pairFeeConfig!!.direction, pairFeeConfig.userLevel, pairFeeConfig.makerFee, pairFeeConfig.takerFee + ), + pairFeeConfig!!.direction, + pairFeeConfig.userLevel, + pairFeeConfig.makerFee, + pairFeeConfig.takerFee ) } diff --git a/accountant/accountant-ports/accountant-persister-postgres/src/main/kotlin/co/nilin/opex/accountant/ports/postgres/impl/TempEventPersisterImpl.kt b/accountant/accountant-ports/accountant-persister-postgres/src/main/kotlin/co/nilin/opex/accountant/ports/postgres/impl/TempEventPersisterImpl.kt index 2dbd04b47..bba689912 100644 --- a/accountant/accountant-ports/accountant-persister-postgres/src/main/kotlin/co/nilin/opex/accountant/ports/postgres/impl/TempEventPersisterImpl.kt +++ b/accountant/accountant-ports/accountant-persister-postgres/src/main/kotlin/co/nilin/opex/accountant/ports/postgres/impl/TempEventPersisterImpl.kt @@ -9,30 +9,31 @@ import com.google.gson.Gson import kotlinx.coroutines.flow.map import kotlinx.coroutines.flow.toList import kotlinx.coroutines.reactive.awaitFirstOrNull -import kotlinx.coroutines.reactive.awaitSingle +import kotlinx.coroutines.reactor.awaitSingleOrNull import org.springframework.data.domain.PageRequest import org.springframework.data.domain.Sort import org.springframework.stereotype.Component import java.time.LocalDateTime @Component -class TempEventPersisterImpl(val tempEventRepository: TempEventRepository) : TempEventPersister { +class TempEventPersisterImpl(private val tempEventRepository: TempEventRepository) : TempEventPersister { override suspend fun saveTempEvent(ouid: String, event: CoreEvent) { tempEventRepository.save( TempEventModel( - null, ouid, event.javaClass.name, - Gson().toJson(event), LocalDateTime.now() + null, + ouid, + event.javaClass.name, + Gson().toJson(event), + LocalDateTime.now() ) - ).awaitSingle() + ).awaitSingleOrNull() } override suspend fun loadTempEvents(ouid: String): List { return tempEventRepository .findByOuid(ouid) - .map { value: TempEventModel -> - Gson().fromJson(value.eventBody, Class.forName(value.eventType)) as CoreEvent - } + .map { Gson().fromJson(it.eventBody, Class.forName(it.eventType)) as CoreEvent } .toList() } @@ -41,25 +42,18 @@ class TempEventPersisterImpl(val tempEventRepository: TempEventRepository) : Tem } override suspend fun removeTempEvents(tempEvents: List) { - tempEventRepository.deleteAll(tempEvents.map { event -> - TempEventModel( - event.id, event.ouid, event.eventBody.javaClass.name, - "", event.eventDate - ) - }).awaitFirstOrNull() + tempEventRepository.deleteAllById(tempEvents.map { it.id }).awaitFirstOrNull() } - override suspend fun fetchTempEvents( - offset: Long, - size: Long - ): List { + override suspend fun fetchTempEvents(offset: Long, size: Long): List { return tempEventRepository .findAll(PageRequest.of(offset.toInt(), size.toInt(), Sort.by(Sort.Direction.ASC, "eventDate"))) - .map { value: TempEventModel -> + .map { TempEvent( - value.id!!, value.ouid, Gson().fromJson(value.eventBody, Class.forName(value.eventType)) - as - CoreEvent, value.eventDate + it.id!!, + it.ouid, + Gson().fromJson(it.eventBody, Class.forName(it.eventType)) as CoreEvent, + it.eventDate ) } .toList() diff --git a/accountant/accountant-ports/accountant-persister-postgres/src/main/kotlin/co/nilin/opex/accountant/ports/postgres/model/FinancialActionModel.kt b/accountant/accountant-ports/accountant-persister-postgres/src/main/kotlin/co/nilin/opex/accountant/ports/postgres/model/FinancialActionModel.kt index 734e4e660..e7f5ac076 100644 --- a/accountant/accountant-ports/accountant-persister-postgres/src/main/kotlin/co/nilin/opex/accountant/ports/postgres/model/FinancialActionModel.kt +++ b/accountant/accountant-ports/accountant-persister-postgres/src/main/kotlin/co/nilin/opex/accountant/ports/postgres/model/FinancialActionModel.kt @@ -8,7 +8,7 @@ import java.math.BigDecimal import java.time.LocalDateTime @Table("fi_actions") -class FinancialActionModel( +data class FinancialActionModel( @Id var id: Long?, @Column("parent_id") var parentId: Long?, @Column("event_type") val eventType: String, diff --git a/accountant/accountant-ports/accountant-persister-postgres/src/main/kotlin/co/nilin/opex/accountant/ports/postgres/model/OrderModel.kt b/accountant/accountant-ports/accountant-persister-postgres/src/main/kotlin/co/nilin/opex/accountant/ports/postgres/model/OrderModel.kt index 1274516d5..5858767ee 100644 --- a/accountant/accountant-ports/accountant-persister-postgres/src/main/kotlin/co/nilin/opex/accountant/ports/postgres/model/OrderModel.kt +++ b/accountant/accountant-ports/accountant-persister-postgres/src/main/kotlin/co/nilin/opex/accountant/ports/postgres/model/OrderModel.kt @@ -10,7 +10,7 @@ import java.math.BigDecimal import java.time.LocalDateTime @Table("orders") -class OrderModel( +data class OrderModel( @Id var id: Long?, val ouid: String, val uuid: String, diff --git a/accountant/accountant-ports/accountant-persister-postgres/src/main/kotlin/co/nilin/opex/accountant/ports/postgres/model/PairFeeConfigModel.kt b/accountant/accountant-ports/accountant-persister-postgres/src/main/kotlin/co/nilin/opex/accountant/ports/postgres/model/PairFeeConfigModel.kt index 346fd5336..cd46a2abc 100644 --- a/accountant/accountant-ports/accountant-persister-postgres/src/main/kotlin/co/nilin/opex/accountant/ports/postgres/model/PairFeeConfigModel.kt +++ b/accountant/accountant-ports/accountant-persister-postgres/src/main/kotlin/co/nilin/opex/accountant/ports/postgres/model/PairFeeConfigModel.kt @@ -5,7 +5,7 @@ import org.springframework.data.relational.core.mapping.Table import java.math.BigDecimal @Table("pair_fee_config") -class PairFeeConfigModel( +data class PairFeeConfigModel( val id: Long?, @Column("pair_config_id") val pairConfigId: String, @Column("direction") val direction: String, diff --git a/accountant/accountant-ports/accountant-persister-postgres/src/test/kotlin/co/nilin/opex/accountant/ports/postgres/FAPersisterImplTest.kt b/accountant/accountant-ports/accountant-persister-postgres/src/test/kotlin/co/nilin/opex/accountant/ports/postgres/FAPersisterImplTest.kt new file mode 100644 index 000000000..5813cb072 --- /dev/null +++ b/accountant/accountant-ports/accountant-persister-postgres/src/test/kotlin/co/nilin/opex/accountant/ports/postgres/FAPersisterImplTest.kt @@ -0,0 +1,41 @@ +package co.nilin.opex.accountant.ports.postgres + +import co.nilin.opex.accountant.core.model.FinancialActionStatus +import co.nilin.opex.accountant.ports.postgres.dao.FinancialActionRepository +import co.nilin.opex.accountant.ports.postgres.impl.FinancialActionPersisterImpl +import co.nilin.opex.accountant.ports.postgres.model.FinancialActionModel +import io.mockk.coEvery +import io.mockk.coVerify +import io.mockk.mockk +import kotlinx.coroutines.runBlocking +import org.junit.jupiter.api.Test +import reactor.core.publisher.Flux +import reactor.core.publisher.Mono + +@Suppress("ReactiveStreamsUnusedPublisher") +class FAPersisterImplTest { + + private val financialActionRepository = mockk { + coEvery { saveAll(any() as Iterable) } returns Flux.just(Valid.faModel) + coEvery { updateStatusAndIncreaseRetry(any(), any()) } returns Mono.empty() + } + private val faPersister = FinancialActionPersisterImpl(financialActionRepository) + + @Test + fun givenListOfActions_whenSaving_callSaveAll(): Unit = runBlocking { + faPersister.persist(listOf(Valid.fa)) + coVerify { financialActionRepository.saveAll(eq(listOf(Valid.faModel))) } + } + + @Test + fun givenFAAndStatus_whenUpdatingStatusAndFANotFound_throwException(): Unit = runBlocking { + faPersister.updateStatus(Valid.fa, FinancialActionStatus.CREATED) + coVerify { + financialActionRepository.updateStatusAndIncreaseRetry( + eq(Valid.fa.id!!), + eq(FinancialActionStatus.CREATED) + ) + } + } + +} \ No newline at end of file diff --git a/accountant/accountant-ports/accountant-persister-postgres/src/test/kotlin/co/nilin/opex/accountant/ports/postgres/OrderPersisterImplTest.kt b/accountant/accountant-ports/accountant-persister-postgres/src/test/kotlin/co/nilin/opex/accountant/ports/postgres/OrderPersisterImplTest.kt new file mode 100644 index 000000000..52260d5d0 --- /dev/null +++ b/accountant/accountant-ports/accountant-persister-postgres/src/test/kotlin/co/nilin/opex/accountant/ports/postgres/OrderPersisterImplTest.kt @@ -0,0 +1,54 @@ +package co.nilin.opex.accountant.ports.postgres + +import co.nilin.opex.accountant.ports.postgres.dao.OrderRepository +import co.nilin.opex.accountant.ports.postgres.impl.OrderPersisterImpl +import io.mockk.coVerify +import io.mockk.every +import io.mockk.mockk +import kotlinx.coroutines.runBlocking +import org.assertj.core.api.Assertions.assertThat +import org.junit.jupiter.api.Test +import reactor.core.publisher.Mono + +@Suppress("ReactiveStreamsUnusedPublisher") +class OrderPersisterImplTest { + + private val repository = mockk { + every { save(any()) } returns Mono.just(Valid.orderModel) + every { findByOuid(any()) } returns Mono.just(Valid.orderModel) + } + + private val persister = OrderPersisterImpl(repository) + + @Test + fun givenOUID_whenLoading_resultNotNull(): Unit = runBlocking { + val order = persister.load(Valid.orderModel.ouid) + assertThat(order).isNotNull + } + + @Test + fun givenOUID_whenLoading_resultIsValidOrder(): Unit = runBlocking { + val order = persister.load(Valid.orderModel.ouid)!! + coVerify { repository.findByOuid(eq(Valid.orderModel.ouid)) } + assertThat(order.status).isEqualTo(Valid.orderModel.status) + assertThat(order.matchingEngineId).isEqualTo(Valid.orderModel.matchingEngineId) + assertThat(order.direction).isEqualTo(Valid.orderModel.direction) + assertThat(order.filledQuantity).isEqualTo(Valid.orderModel.filledQuantity) + assertThat(order.ouid).isEqualTo(Valid.orderModel.ouid) + } + + @Test + fun givenNewOrder_whenSaving_saveAndReturnValidOrder(): Unit = runBlocking { + val newOrder = persister.save(Valid.order) + coVerify { repository.save(any()) } + with(Valid.order) { + assertThat(newOrder).isNotNull + assertThat(status).isEqualTo(Valid.orderModel.status) + assertThat(matchingEngineId).isEqualTo(Valid.orderModel.matchingEngineId) + assertThat(direction).isEqualTo(Valid.orderModel.direction) + assertThat(filledQuantity).isEqualTo(Valid.orderModel.filledQuantity) + assertThat(ouid).isEqualTo(Valid.orderModel.ouid) + } + } + +} \ No newline at end of file diff --git a/accountant/accountant-ports/accountant-persister-postgres/src/test/kotlin/co/nilin/opex/accountant/ports/postgres/PairConfigLoaderTest.kt b/accountant/accountant-ports/accountant-persister-postgres/src/test/kotlin/co/nilin/opex/accountant/ports/postgres/PairConfigLoaderTest.kt new file mode 100644 index 000000000..73da00a7f --- /dev/null +++ b/accountant/accountant-ports/accountant-persister-postgres/src/test/kotlin/co/nilin/opex/accountant/ports/postgres/PairConfigLoaderTest.kt @@ -0,0 +1,141 @@ +package co.nilin.opex.accountant.ports.postgres + +import co.nilin.opex.accountant.ports.postgres.dao.PairConfigRepository +import co.nilin.opex.accountant.ports.postgres.dao.PairFeeConfigRepository +import co.nilin.opex.accountant.ports.postgres.impl.PairConfigLoaderImpl +import co.nilin.opex.matching.engine.core.model.OrderDirection +import co.nilin.opex.utility.error.data.OpexException +import io.mockk.every +import io.mockk.mockk +import io.mockk.verify +import kotlinx.coroutines.runBlocking +import org.assertj.core.api.Assertions.assertThat +import org.assertj.core.api.Assertions.assertThatThrownBy +import org.junit.jupiter.api.Test +import reactor.core.publisher.Flux +import reactor.core.publisher.Mono + +@Suppress("ReactiveStreamsUnusedPublisher") +class PairConfigLoaderTest { + + private val pairConfigRepository = mockk { + every { findAll() } returns Flux.just(Valid.pairConfigModel, Valid.pairConfigModel) + every { findById(any() as String) } returns Mono.just(Valid.pairConfigModel) + } + private val pairFeeConfigRepository = mockk { + every { findAll() } returns Flux.just(Valid.pairFeeConfigModel, Valid.pairFeeConfigModel) + every { findByPairAndDirectionAndUserLevel(any(), any(), any()) } returns Mono.just(Valid.pairFeeConfigModel) + } + private val pairConfigLoader = PairConfigLoaderImpl(pairConfigRepository, pairFeeConfigRepository) + + @Test + fun givenPairConfigs_whenListNotEmpty_resultIsNotEmptyAndValid(): Unit = runBlocking { + val configs = pairConfigLoader.loadPairConfigs() + assertThat(configs.size).isEqualTo(2) + with(configs[1]) { + assertThat(pair).isEqualTo(Valid.pairConfig.pair) + assertThat(leftSideWalletSymbol).isEqualTo(Valid.pairConfig.leftSideWalletSymbol) + assertThat(rightSideWalletSymbol).isEqualTo(Valid.pairConfig.rightSideWalletSymbol) + assertThat(leftSideFraction).isEqualTo(Valid.pairConfig.leftSideFraction) + assertThat(rightSideFraction).isEqualTo(Valid.pairConfig.rightSideFraction) + } + } + + @Test + fun givenPairFeeConfigs_whenListNotEmpty_resultIsNotEmptyAndValid(): Unit = runBlocking { + val configs = pairConfigLoader.loadPairFeeConfigs() + assertThat(configs.size).isEqualTo(2) + with(configs[1]) { + assertThat(pairConfig.pair).isEqualTo(Valid.pairConfig.pair) + assertThat(userLevel).isEqualTo(Valid.pairFeeConfigModel.userLevel) + assertThat(direction).isEqualTo(Valid.pairFeeConfigModel.direction) + assertThat(makerFee).isEqualTo(Valid.pairFeeConfigModel.makerFee) + assertThat(takerFee).isEqualTo(Valid.pairFeeConfigModel.takerFee) + } + } + + @Test + fun givenPairDirectionUserLevel_whenPairConfigNotFound_throwsException(): Unit = runBlocking { + every { pairConfigRepository.findById(any() as String) } returns Mono.empty() + assertThatThrownBy { + runBlocking { pairConfigLoader.load("BTC_USDT", OrderDirection.BID, "*") } + }.isInstanceOf(OpexException::class.java) + } + + @Test + fun givenPairDirection_whenUserLevelEmpty_loadWithDefaultUserLevel(): Unit = runBlocking { + val pair = pairConfigLoader.load("BTC_USDT", OrderDirection.BID, "") + assertThat(pair).isNotNull + verify(exactly = 1) { + pairFeeConfigRepository.findByPairAndDirectionAndUserLevel( + eq("BTC_USDT"), + eq(OrderDirection.BID), + eq("*") + ) + } + } + + @Test + fun givenPairDirection_whenPairFeeConfigNotFound_throwsException(): Unit = runBlocking { + every { + pairFeeConfigRepository.findByPairAndDirectionAndUserLevel(any(), any(), eq("*")) + } returns Mono.empty() + + assertThatThrownBy { + runBlocking { pairConfigLoader.load("BTC_USDT", OrderDirection.BID, "") } + }.isInstanceOf(OpexException::class.java) + } + + @Test + fun givenPairDirectionUserLevel_whenPairFeeConfigNotFound_loadWithDefaultUserLevel(): Unit = runBlocking { + every { + pairFeeConfigRepository.findByPairAndDirectionAndUserLevel(any(), any(), eq("1")) + } returns Mono.empty() + + val pair = pairConfigLoader.load("BTC_USDT", OrderDirection.BID, "1") + assertThat(pair).isNotNull + verify(exactly = 1) { + pairFeeConfigRepository.findByPairAndDirectionAndUserLevel( + eq("BTC_USDT"), + eq(OrderDirection.BID), + eq("*") + ) + } + } + + @Test + fun givenPairDirectionUserLevel_whenPairFeeConfigNotFoundWithActualAndDefaultUserLevel_throwsException(): Unit = + runBlocking { + every { + pairFeeConfigRepository.findByPairAndDirectionAndUserLevel(any(), any(), eq("1")) + } returns Mono.empty() + + every { + pairFeeConfigRepository.findByPairAndDirectionAndUserLevel(any(), any(), eq("*")) + } returns Mono.empty() + + assertThatThrownBy { + runBlocking { pairConfigLoader.load("BTC_USDT", OrderDirection.BID, "1") } + }.isInstanceOf(OpexException::class.java) + } + + @Test + fun givenPairDirection_whenPairConfigNotFound_throwException(): Unit = runBlocking { + every { pairConfigRepository.findById(any() as String) } returns Mono.empty() + assertThatThrownBy { + runBlocking { pairConfigLoader.load("BTC_USDT", OrderDirection.BID) } + }.isInstanceOf(OpexException::class.java) + } + + @Test + fun givenPairDirection_whenConfigLoaded_returnValidPairConfig(): Unit = runBlocking { + with(pairConfigLoader.load("BTC_USDT", OrderDirection.BID)) { + assertThat(pair).isEqualTo(Valid.pairConfigModel.pair) + assertThat(leftSideWalletSymbol).isEqualTo(Valid.pairConfigModel.leftSideWalletSymbol) + assertThat(rightSideWalletSymbol).isEqualTo(Valid.pairConfigModel.rightSideWalletSymbol) + assertThat(rightSideFraction).isEqualTo(Valid.pairConfigModel.rightSideFraction) + assertThat(leftSideFraction).isEqualTo(Valid.pairConfigModel.leftSideFraction) + } + } + +} \ No newline at end of file diff --git a/accountant/accountant-ports/accountant-persister-postgres/src/test/kotlin/co/nilin/opex/accountant/ports/postgres/TempEventPersisterTest.kt b/accountant/accountant-ports/accountant-persister-postgres/src/test/kotlin/co/nilin/opex/accountant/ports/postgres/TempEventPersisterTest.kt new file mode 100644 index 000000000..d0933fdee --- /dev/null +++ b/accountant/accountant-ports/accountant-persister-postgres/src/test/kotlin/co/nilin/opex/accountant/ports/postgres/TempEventPersisterTest.kt @@ -0,0 +1,55 @@ +package co.nilin.opex.accountant.ports.postgres + +import co.nilin.opex.accountant.ports.postgres.dao.TempEventRepository +import co.nilin.opex.accountant.ports.postgres.impl.TempEventPersisterImpl +import co.nilin.opex.accountant.ports.postgres.model.TempEventModel +import co.nilin.opex.matching.engine.core.eventh.events.CoreEvent +import io.mockk.every +import io.mockk.mockk +import io.mockk.verify +import kotlinx.coroutines.flow.flow +import kotlinx.coroutines.runBlocking +import org.assertj.core.api.Assertions.assertThat +import org.junit.jupiter.api.Test +import reactor.core.publisher.Mono + +@Suppress("ReactiveStreamsUnusedPublisher") +class TempEventPersisterTest { + + private val tempEventRepository = mockk { + every { save(any()) } returns Mono.empty() + every { findAll(any()) } returns flow { emit(Valid.tempEventModel) } + every { findByOuid(any()) } returns flow { emit(Valid.tempEventModel) } + every { delete(any()) } returns Mono.empty() + every { deleteAll(any() as Iterable) } returns Mono.empty() + every { deleteByOuid(any()) } returns Mono.empty() + } + + private val persister = TempEventPersisterImpl(tempEventRepository) + + @Test + fun givenOuidAndEvent_whenSaving_callRepoOnce(): Unit = runBlocking { + persister.saveTempEvent("event_1", Valid.testEvent) + verify(exactly = 1) { tempEventRepository.save(any()) } + } + + @Test + fun givenOUID_whenLoadingTempEvent_parseEventJSON(): Unit = runBlocking { + val events = persister.loadTempEvents("event_1") + + assertThat(events).isNotEmpty + + with(events[0]) { + assertThat(this).isInstanceOf(CoreEvent::class.java) + assertThat(pair.rightSideName).isEqualTo(Valid.testEvent.rightSidePair) + assertThat(pair.leftSideName).isEqualTo(Valid.testEvent.leftSidePair) + } + } + + @Test + fun givenOuid_whenDeletingByOUID_callRepoOnce(): Unit = runBlocking { + persister.removeTempEvents(Valid.tempEvent.ouid) + verify(exactly = 1) { tempEventRepository.deleteByOuid(eq(Valid.tempEvent.ouid)) } + } + +} \ No newline at end of file diff --git a/accountant/accountant-ports/accountant-persister-postgres/src/test/kotlin/co/nilin/opex/accountant/ports/postgres/Valid.kt b/accountant/accountant-ports/accountant-persister-postgres/src/test/kotlin/co/nilin/opex/accountant/ports/postgres/Valid.kt new file mode 100644 index 000000000..9931a14ff --- /dev/null +++ b/accountant/accountant-ports/accountant-persister-postgres/src/test/kotlin/co/nilin/opex/accountant/ports/postgres/Valid.kt @@ -0,0 +1,151 @@ +package co.nilin.opex.accountant.ports.postgres + +import co.nilin.opex.accountant.core.inout.OrderStatus +import co.nilin.opex.accountant.core.model.FinancialAction +import co.nilin.opex.accountant.core.model.Order +import co.nilin.opex.accountant.core.model.PairConfig +import co.nilin.opex.accountant.core.model.TempEvent +import co.nilin.opex.accountant.ports.postgres.model.* +import co.nilin.opex.matching.engine.core.eventh.events.CoreEvent +import co.nilin.opex.matching.engine.core.eventh.events.TradeEvent +import co.nilin.opex.matching.engine.core.model.MatchConstraint +import co.nilin.opex.matching.engine.core.model.OrderDirection +import co.nilin.opex.matching.engine.core.model.OrderType +import co.nilin.opex.matching.engine.core.model.Pair +import java.math.BigDecimal +import java.time.LocalDateTime + +object Valid { + + private val currentTime = LocalDateTime.now() + + val orderModel = OrderModel( + 1, + "order_1", + "user_1", + "BTC_USDT", + 1, + 0.01.toBigDecimal(), + 0.01.toBigDecimal(), + 0.00001.toBigDecimal(), + 0.01.toBigDecimal(), + "", + OrderDirection.BID, + MatchConstraint.GTC, + OrderType.LIMIT_ORDER, + 5500000, + 100, + 100, + 55000.0.toBigDecimal(), + 1.0.toBigDecimal(), + 1.0.toBigDecimal(), + BigDecimal.ZERO, + BigDecimal.ZERO, + OrderStatus.FILLED.code, + "", + "", + currentTime + ) + + val order = Order( + "BTC_USDT", + "order_1", + 1, + 0.01.toBigDecimal(), + 0.01.toBigDecimal(), + 0.00001.toBigDecimal(), + 0.01.toBigDecimal(), + "user_1", + "", + OrderDirection.BID, + MatchConstraint.GTC, + OrderType.LIMIT_ORDER, + 5500000, + 100, + 100, + 55000.0.toBigDecimal(), + 1.0.toBigDecimal(), + 1.0.toBigDecimal(), + BigDecimal.ZERO, + BigDecimal.ZERO, + OrderStatus.FILLED.code, + 1 + ) + + val pairConfigModel = PairConfigModel( + "BTC_USDT", + "BTC", + "USDT", + 0.000001.toBigDecimal(), + 0.01.toBigDecimal() + ) + + val pairConfig = PairConfig( + "BTC_USDT", + "BTC", + "USDT", + 0.000001.toBigDecimal(), + 0.01.toBigDecimal() + ) + + val pairFeeConfigModel = PairFeeConfigModel( + 1, + "BTC_USDT", + "BID", + "1", + 0.01.toBigDecimal(), + 0.01.toBigDecimal() + ) + + class TestCoreEvent(val leftSidePair: String, val rightSidePair: String) : + CoreEvent(Pair(leftSidePair, rightSidePair), currentTime) + + val testEvent = TestCoreEvent("BTC", "USDT") + + val tempEvent = TempEvent( + 1, + "event_1", + TestCoreEvent("BTC", "USDT"), + currentTime + ) + + val tempEventModel = TempEventModel( + 1, + "event_1", + TestCoreEvent::class.java.name, + "{\"leftSidePair\":\"BTC\",\"rightSidePair\":\"USDT\",\"pair\":{\"leftSideName\":\"BTC\",\"rightSideName\":\"USDT\"},\"eventDate\":{\"date\":{\"year\":2022,\"month\":5,\"day\":30},\"time\":{\"hour\":16,\"minute\":57,\"second\":44,\"nano\":809838600}}}", + currentTime + ) + + val fa = FinancialAction( + null, + TradeEvent::class.java.name, + "trade_id", + "BTC_USDT", + 10000.0.toBigDecimal(), + "user_parent", + "main", + "system", + "main", + currentTime, + 1, + 1 + ) + + val faModel = FinancialActionModel( + null, + null, + TradeEvent::class.java.name, + "trade_id", + "BTC_USDT", + 10000.0.toBigDecimal(), + "user_parent", + "main", + "system", + "main", + "", + "", + currentTime + ) + +} \ No newline at end of file diff --git a/accountant/accountant-ports/accountant-submitter-kafka/src/main/kotlin/co/nilin/opex/accountant/ports/kafka/submitter/service/EventPublisher.kt b/accountant/accountant-ports/accountant-submitter-kafka/src/main/kotlin/co/nilin/opex/accountant/ports/kafka/submitter/service/EventPublisher.kt new file mode 100644 index 000000000..afd662665 --- /dev/null +++ b/accountant/accountant-ports/accountant-submitter-kafka/src/main/kotlin/co/nilin/opex/accountant/ports/kafka/submitter/service/EventPublisher.kt @@ -0,0 +1,7 @@ +package co.nilin.opex.accountant.ports.kafka.submitter.service + +interface EventPublisher { + + val topic: String + +} \ No newline at end of file diff --git a/accountant/accountant-ports/accountant-submitter-kafka/src/main/kotlin/co/nilin/opex/accountant/ports/kafka/submitter/service/RichOrderSubmitter.kt b/accountant/accountant-ports/accountant-submitter-kafka/src/main/kotlin/co/nilin/opex/accountant/ports/kafka/submitter/service/RichOrderSubmitter.kt index 0a93b1b2c..aaeabb34a 100644 --- a/accountant/accountant-ports/accountant-submitter-kafka/src/main/kotlin/co/nilin/opex/accountant/ports/kafka/submitter/service/RichOrderSubmitter.kt +++ b/accountant/accountant-ports/accountant-submitter-kafka/src/main/kotlin/co/nilin/opex/accountant/ports/kafka/submitter/service/RichOrderSubmitter.kt @@ -11,20 +11,24 @@ import kotlin.coroutines.resumeWithException import kotlin.coroutines.suspendCoroutine @Component -class RichOrderSubmitter(@Qualifier("richOrderKafkaTemplate") val kafkaTemplate: KafkaTemplate) : - RichOrderPublisher { +class RichOrderSubmitter( + @Qualifier("richOrderKafkaTemplate") + private val kafkaTemplate: KafkaTemplate, +) : RichOrderPublisher, EventPublisher { private val logger = LoggerFactory.getLogger(RichOrderSubmitter::class.java) + override val topic = "richOrder" + override suspend fun publish(order: RichOrderEvent): Unit = suspendCoroutine { cont -> logger.info("Submitting RichOrder") - val sendFuture = kafkaTemplate.send("richOrder", order) + val sendFuture = kafkaTemplate.send(topic, order) sendFuture.addCallback({ cont.resume(Unit) }, { logger.error("Error submitting RichOrder", it) - cont.resume(Unit) + cont.resumeWithException(it) }) } } \ No newline at end of file diff --git a/accountant/accountant-ports/accountant-submitter-kafka/src/main/kotlin/co/nilin/opex/accountant/ports/kafka/submitter/service/RichTradeSubmitter.kt b/accountant/accountant-ports/accountant-submitter-kafka/src/main/kotlin/co/nilin/opex/accountant/ports/kafka/submitter/service/RichTradeSubmitter.kt index c5f3641af..e3d7554db 100644 --- a/accountant/accountant-ports/accountant-submitter-kafka/src/main/kotlin/co/nilin/opex/accountant/ports/kafka/submitter/service/RichTradeSubmitter.kt +++ b/accountant/accountant-ports/accountant-submitter-kafka/src/main/kotlin/co/nilin/opex/accountant/ports/kafka/submitter/service/RichTradeSubmitter.kt @@ -11,20 +11,25 @@ import kotlin.coroutines.resumeWithException import kotlin.coroutines.suspendCoroutine @Component -class RichTradeSubmitter(@Qualifier("richTradeKafkaTemplate") val kafkaTemplate: KafkaTemplate) : - RichTradePublisher { +class RichTradeSubmitter( + @Qualifier("richTradeKafkaTemplate") + private val kafkaTemplate: KafkaTemplate +) : RichTradePublisher, EventPublisher { private val logger = LoggerFactory.getLogger(RichTradeSubmitter::class.java) + override val topic = "richTrade" + override suspend fun publish(trade: RichTrade): Unit = suspendCoroutine { cont -> logger.info("Submitting RichTrade event: id=${trade.id}") - val sendFuture = kafkaTemplate.send("richTrade", trade) + val sendFuture = kafkaTemplate.send(topic, trade) sendFuture.addCallback({ cont.resume(Unit) }, { logger.error("RichTrade submitter error", it) - cont.resume(Unit) + cont.resumeWithException(it) }) } + } \ No newline at end of file diff --git a/accountant/accountant-ports/accountant-submitter-kafka/src/main/kotlin/co/nilin/opex/accountant/ports/kafka/submitter/service/TempEventSubmitter.kt b/accountant/accountant-ports/accountant-submitter-kafka/src/main/kotlin/co/nilin/opex/accountant/ports/kafka/submitter/service/TempEventSubmitter.kt index 09c184248..f2630016a 100644 --- a/accountant/accountant-ports/accountant-submitter-kafka/src/main/kotlin/co/nilin/opex/accountant/ports/kafka/submitter/service/TempEventSubmitter.kt +++ b/accountant/accountant-ports/accountant-submitter-kafka/src/main/kotlin/co/nilin/opex/accountant/ports/kafka/submitter/service/TempEventSubmitter.kt @@ -11,16 +11,20 @@ import kotlin.coroutines.resumeWithException import kotlin.coroutines.suspendCoroutine @Component -class TempEventSubmitter(@Qualifier("accountantEventKafkaTemplate") val kafkaTemplate: KafkaTemplate) : - TempEventRepublisher { +class TempEventSubmitter( + @Qualifier("accountantEventKafkaTemplate") + private val kafkaTemplate: KafkaTemplate +) : TempEventRepublisher, EventPublisher { private val logger = LoggerFactory.getLogger(TempEventSubmitter::class.java) + override val topic = "tempevents" + override suspend fun republish(events: List): Unit = suspendCoroutine { cont -> logger.info("Submitting TempEvents") events.forEach { event -> - val sendFuture = kafkaTemplate.send("tempevents", event) + val sendFuture = kafkaTemplate.send(topic, event) sendFuture.addCallback({ cont.resume(Unit) }, { diff --git a/accountant/accountant-ports/accountant-submitter-kafka/src/test/kotlin/co/nilin/opex/accountant/ports/kafka/submitter/EventPublishersTest.kt b/accountant/accountant-ports/accountant-submitter-kafka/src/test/kotlin/co/nilin/opex/accountant/ports/kafka/submitter/EventPublishersTest.kt new file mode 100644 index 000000000..987550f39 --- /dev/null +++ b/accountant/accountant-ports/accountant-submitter-kafka/src/test/kotlin/co/nilin/opex/accountant/ports/kafka/submitter/EventPublishersTest.kt @@ -0,0 +1,98 @@ +package co.nilin.opex.accountant.ports.kafka.submitter + +import co.nilin.opex.accountant.core.inout.RichOrderEvent +import co.nilin.opex.accountant.core.inout.RichTrade +import co.nilin.opex.accountant.ports.kafka.submitter.service.RichOrderSubmitter +import co.nilin.opex.accountant.ports.kafka.submitter.service.RichTradeSubmitter +import co.nilin.opex.accountant.ports.kafka.submitter.service.TempEventSubmitter +import co.nilin.opex.matching.engine.core.eventh.events.CoreEvent +import io.mockk.every +import io.mockk.mockk +import io.mockk.verify +import kotlinx.coroutines.runBlocking +import org.assertj.core.api.Assertions +import org.assertj.core.api.Assertions.assertThat +import org.junit.jupiter.api.Test +import org.springframework.kafka.core.KafkaTemplate +import org.springframework.kafka.support.SendResult +import org.springframework.util.concurrent.SettableListenableFuture + +class EventPublishersTest { + + private val richOrderTemplate = mockk>() + private val richOrderSubmitter = RichOrderSubmitter(richOrderTemplate) + + private val richTradeTemplate = mockk>() + private val richTradeSubmitter = RichTradeSubmitter(richTradeTemplate) + + private val tempEventTemplate = mockk>() + private val tempEventSubmitter = TempEventSubmitter(tempEventTemplate) + + init { + every { richOrderTemplate.send(any(), any()) } returns Valid.kafkaSendFuture() + every { richTradeTemplate.send(any(), any()) } returns Valid.kafkaSendFuture() + every { tempEventTemplate.send(any(), any()) } returns Valid.kafkaSendFuture() + } + + @Test + fun givenSubmitters_validateTopics(): Unit = runBlocking { + assertThat(richOrderSubmitter.topic).isEqualTo("richOrder") + assertThat(richTradeSubmitter.topic).isEqualTo("richTrade") + assertThat(tempEventSubmitter.topic).isEqualTo("tempevents") + } + + @Test + fun givenRichOrderSubmitter_whenKafkaFailsToSend_throwException(): Unit = runBlocking { + val future = SettableListenableFuture>() + every { richOrderTemplate.send(any(), any()) } returns future + + future.setException(IllegalStateException("mock")) + + Assertions.assertThatThrownBy { + runBlocking { richOrderSubmitter.publish(Valid.testRichOrder) } + }.isInstanceOfAny(Throwable::class.java) + } + + @Test + fun givenRichTradeSubmitter_whenKafkaFailsToSend_throwException(): Unit = runBlocking { + val future = SettableListenableFuture>() + every { richTradeTemplate.send(any(), any()) } returns future + + future.setException(IllegalStateException("mock")) + + Assertions.assertThatThrownBy { + runBlocking { richTradeSubmitter.publish(Valid.richTrade) } + }.isInstanceOfAny(Throwable::class.java) + } + + @Test + fun givenTempEventSubmitter_whenKafkaFailsToSend_throwException(): Unit = runBlocking { + val future = SettableListenableFuture>() + every { tempEventTemplate.send(any(), any()) } returns future + + future.setException(IllegalStateException("mock")) + + Assertions.assertThatThrownBy { + runBlocking { tempEventSubmitter.republish(listOf(Valid.testCoreEvent)) } + }.isInstanceOfAny(Throwable::class.java) + } + + @Test + fun givenRichOrderSubmitter_whenPublish_callSendWithCorrectTopic(): Unit = runBlocking { + richOrderSubmitter.publish(Valid.testRichOrder) + verify { richOrderTemplate.send(eq(richOrderSubmitter.topic), eq(Valid.testRichOrder)) } + } + + @Test + fun givenTradeOrderSubmitter_whenPublish_callSendWithCorrectTopic(): Unit = runBlocking { + richTradeSubmitter.publish(Valid.richTrade) + verify { richTradeTemplate.send(eq(richTradeSubmitter.topic), eq(Valid.richTrade)) } + } + + @Test + fun givenTempEventSubmitter_whenRepublish_callSendForEachEventWithCorrectTopic(): Unit = runBlocking { + tempEventSubmitter.republish(listOf(Valid.testCoreEvent, Valid.testCoreEvent)) + verify(exactly = 2) { tempEventTemplate.send(eq(tempEventSubmitter.topic), eq(Valid.testCoreEvent)) } + } + +} \ No newline at end of file diff --git a/accountant/accountant-ports/accountant-submitter-kafka/src/test/kotlin/co/nilin/opex/accountant/ports/kafka/submitter/Valid.kt b/accountant/accountant-ports/accountant-submitter-kafka/src/test/kotlin/co/nilin/opex/accountant/ports/kafka/submitter/Valid.kt new file mode 100644 index 000000000..90356d53d --- /dev/null +++ b/accountant/accountant-ports/accountant-submitter-kafka/src/test/kotlin/co/nilin/opex/accountant/ports/kafka/submitter/Valid.kt @@ -0,0 +1,52 @@ +package co.nilin.opex.accountant.ports.kafka.submitter + +import co.nilin.opex.accountant.core.inout.RichOrderEvent +import co.nilin.opex.accountant.core.inout.RichTrade +import co.nilin.opex.matching.engine.core.eventh.events.CoreEvent +import co.nilin.opex.matching.engine.core.model.OrderDirection +import co.nilin.opex.matching.engine.core.model.Pair +import org.springframework.kafka.support.SendResult +import org.springframework.util.concurrent.SettableListenableFuture +import java.time.LocalDateTime + +object Valid { + + class TestRichOrderEvent : RichOrderEvent + class TestCoreEvent : CoreEvent(Pair("BTC", "USDT")) + + val currentTime = LocalDateTime.now() + + val testRichOrder = TestRichOrderEvent() + val testCoreEvent = TestCoreEvent() + val richTrade = RichTrade( + 1, + "BTC_USDT", + "", + "", + 1, + OrderDirection.BID, + 1.0.toBigDecimal(), + 1.0.toBigDecimal(), + 1.0.toBigDecimal(), + 1.0.toBigDecimal(), + 1.0.toBigDecimal(), + "", + "", + "", + 1, + OrderDirection.BID, + 1.0.toBigDecimal(), + 1.0.toBigDecimal(), + 1.0.toBigDecimal(), + 1.0.toBigDecimal(), + 1.0.toBigDecimal(), + "", + 1.0.toBigDecimal(), + currentTime + ) + + fun kafkaSendFuture() = SettableListenableFuture>().apply { + set(SendResult(null, null)) + } + +} \ No newline at end of file diff --git a/accountant/pom.xml b/accountant/pom.xml index 01387a319..0b62a1582 100644 --- a/accountant/pom.xml +++ b/accountant/pom.xml @@ -29,6 +29,11 @@ org.springframework.boot spring-boot-starter-test + + io.mockk + mockk + test +