Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -104,31 +104,31 @@ class AppConfig {

@Autowired
fun configureOrderListener(orderKafkaListener: OrderKafkaListener, orderListener: OrderListener) {
orderKafkaListener.addOrderListener(orderListener)
orderKafkaListener.addListener(orderListener)
}

@Autowired
fun configureTradeListener(
tradeKafkaListener: TradeKafkaListener,
accountantTradeListener: AccountantTradeListener
) {
tradeKafkaListener.addTradeListener(accountantTradeListener)
tradeKafkaListener.addListener(accountantTradeListener)
}

@Autowired
fun configureEventListener(
eventKafkaListener: EventKafkaListener,
accountantEventListener: AccountantEventListener
) {
eventKafkaListener.addEventListener(accountantEventListener)
eventKafkaListener.addListener(accountantEventListener)
}

@Autowired
fun configureTempEventListener(
tempEventKafkaListener: TempEventKafkaListener,
accountantTempEventListener: AccountantTempEventListener
) {
tempEventKafkaListener.addEventListener(accountantTempEventListener)
tempEventKafkaListener.addListener(accountantTempEventListener)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,15 @@ class AccountantEventListener(private val orderManager: OrderManager) : EventLis
return "EventListener"
}

override fun onEvent(coreEvent: CoreEvent, partition: Int, offset: Long, timestamp: Long) {
override fun onEvent(event: CoreEvent, partition: Int, offset: Long, timestamp: Long) {
runBlocking {
when (coreEvent) {
is CreateOrderEvent -> orderManager.handleNewOrder(coreEvent)
is RejectOrderEvent -> orderManager.handleRejectOrder(coreEvent)
is UpdatedOrderEvent -> orderManager.handleUpdateOrder(coreEvent)
is CancelOrderEvent -> orderManager.handleCancelOrder(coreEvent)
when (event) {
is CreateOrderEvent -> orderManager.handleNewOrder(event)
is RejectOrderEvent -> orderManager.handleRejectOrder(event)
is UpdatedOrderEvent -> orderManager.handleUpdateOrder(event)
is CancelOrderEvent -> orderManager.handleCancelOrder(event)
else -> {
println("Event is not accepted ${coreEvent::class.java}")
println("Event is not accepted ${event::class.java}")
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,17 @@ class AccountantTempEventListener(
return "TempEventListener"
}

override fun onEvent(coreEvent: CoreEvent, partition: Int, offset: Long, timestamp: Long) {
println("TempEvent $coreEvent")
override fun onEvent(event: CoreEvent, partition: Int, offset: Long, timestamp: Long) {
println("TempEvent $event")
runBlocking {
when (coreEvent) {
is CreateOrderEvent -> orderManager.handleNewOrder(coreEvent)
is RejectOrderEvent -> orderManager.handleRejectOrder(coreEvent)
is UpdatedOrderEvent -> orderManager.handleUpdateOrder(coreEvent)
is CancelOrderEvent -> orderManager.handleCancelOrder(coreEvent)
is TradeEvent -> tradeManager.handleTrade(coreEvent)
when (event) {
is CreateOrderEvent -> orderManager.handleNewOrder(event)
is RejectOrderEvent -> orderManager.handleRejectOrder(event)
is UpdatedOrderEvent -> orderManager.handleUpdateOrder(event)
is CancelOrderEvent -> orderManager.handleCancelOrder(event)
is TradeEvent -> tradeManager.handleTrade(event)
else -> {
throw IllegalArgumentException("Event is not accepted ${coreEvent::class.java}")
throw IllegalArgumentException("Event is not accepted ${event::class.java}")
}
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package co.nilin.opex.accountant.app.listener

import co.nilin.opex.accountant.core.api.TradeManager
import co.nilin.opex.accountant.ports.kafka.listener.spi.Listener
import co.nilin.opex.accountant.ports.kafka.listener.spi.TradeListener
import co.nilin.opex.matching.engine.core.eventh.events.TradeEvent
import kotlinx.coroutines.runBlocking
Expand All @@ -11,9 +12,9 @@ class AccountantTradeListener(private val tradeManager: TradeManager) : TradeLis
return "TradeListener"
}

override fun onTrade(tradeEvent: TradeEvent, partition: Int, offset: Long, timestamp: Long) {
override fun onEvent(event: TradeEvent, partition: Int, offset: Long, timestamp: Long) {
runBlocking {
tradeManager.handleTrade(tradeEvent)
tradeManager.handleTrade(event)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,22 +15,22 @@ class OrderListener(private val orderManager: OrderManager) : OrderSubmitRequest
return "OrderListener"
}

override fun onOrder(order: OrderSubmitRequest, partition: Int, offset: Long, timestamp: Long) {
override fun onEvent(event: OrderSubmitRequest, partition: Int, offset: Long, timestamp: Long) {
runBlocking {
logger.info("Order submit event received ${order.ouid}")
logger.info("Order submit event received ${event.ouid}")

orderManager.handleRequestOrder(
SubmitOrderEvent(
order.ouid,
order.uuid,
order.orderId,
order.pair,
order.price,
order.quantity,
order.quantity,
order.direction,
order.matchConstraint,
order.orderType
event.ouid,
event.uuid,
event.orderId,
event.pair,
event.price,
event.quantity,
event.quantity,
event.direction,
event.matchConstraint,
event.orderType
)
)
}
Expand Down
13 changes: 11 additions & 2 deletions accountant/accountant-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,17 @@
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<groupId>org.mockito.kotlin</groupId>
<artifactId>mockito-kotlin</artifactId>
</dependency>
<dependency>
<groupId>io.mockk</groupId>
<artifactId>mockk</artifactId>
</dependency>
<dependency>
<groupId>org.jetbrains.kotlinx</groupId>
<artifactId>kotlinx-coroutines-core</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
package co.nilin.opex.accountant.core.service

import co.nilin.opex.accountant.core.model.FinancialActionStatus
import co.nilin.opex.accountant.core.spi.FinancialActionLoader
import co.nilin.opex.accountant.core.spi.FinancialActionPersister
import co.nilin.opex.accountant.core.spi.WalletProxy
import io.mockk.coEvery
import io.mockk.coVerify
import io.mockk.mockk
import kotlinx.coroutines.runBlocking
import org.junit.jupiter.api.Test

class FAJobManagerImplTest {

private val financialActionLoader = mockk<FinancialActionLoader>()
private val financialActionPersister = mockk<FinancialActionPersister>()
private val walletProxy = mockk<WalletProxy>()

private val sut = FinancialActionJobManagerImpl(financialActionLoader, financialActionPersister, walletProxy)

init {
coEvery { financialActionLoader.loadUnprocessed(any(), any()) } returns listOf(Valid.fa, Valid.fa)
coEvery { financialActionPersister.updateStatus(any(), any()) } returns Unit
}

@Test
fun given2FALoaded_whenProcessing_thenVerifyThatTransferProxyCalled2Times() = runBlocking {
coEvery { walletProxy.transfer(any(), any(), any(), any(), any(), any(), any(), any()) } returns Unit
sut.processFinancialActions(0, 2)
with(Valid.fa) {
coVerify(exactly = 2) {
walletProxy.transfer(
eq(symbol),
eq(senderWalletType),
eq(sender),
eq(receiverWalletType),
eq(receiver),
eq(amount),
eq(eventType + pointer),
any()
)
}
coVerify(exactly = 2) {
financialActionPersister.updateStatus(
eq(this@with),
eq(FinancialActionStatus.PROCESSED)
)
}
}
}

@Test
fun given2FALoaded_whenProcessingFailed_thenUpdateStatusCalledRegardless() = runBlocking {
coEvery {
walletProxy.transfer(any(), any(), any(), any(), any(), any(), any(), any())
} throws IllegalStateException()

sut.processFinancialActions(0, 2)
with(Valid.fa) {
coVerify(exactly = 2) {
walletProxy.transfer(
eq(symbol),
eq(senderWalletType),
eq(sender),
eq(receiverWalletType),
eq(receiver),
eq(amount),
eq(eventType + pointer),
any()
)
}
coVerify(exactly = 2) {
financialActionPersister.updateStatus(
eq(this@with),
eq(FinancialActionStatus.CREATED)
)
}
}
}

@Test
fun given2FALoaded_whenProcessingFailedAndRetryCountExceeded_thenUpdateStatusCalledRegardless() = runBlocking {
coEvery {
walletProxy.transfer(any(), any(), any(), any(), any(), any(), any(), any())
} throws IllegalStateException()

coEvery { financialActionLoader.loadUnprocessed(any(), any()) } returns listOf(Valid.faHighRetry)

sut.processFinancialActions(0, 1)
with(Valid.faHighRetry) {
coVerify(exactly = 1) {
walletProxy.transfer(
eq(symbol),
eq(senderWalletType),
eq(sender),
eq(receiverWalletType),
eq(receiver),
eq(amount),
eq(eventType + pointer),
any()
)
}
coVerify(exactly = 1) {
financialActionPersister.updateStatus(
eq(this@with),
eq(FinancialActionStatus.ERROR)
)
}
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
package co.nilin.opex.accountant.core.service

import co.nilin.opex.accountant.core.model.FinancialAction
import co.nilin.opex.matching.engine.core.eventh.events.TradeEvent
import kotlinx.coroutines.runBlocking
import org.assertj.core.api.Assertions.assertThat
import org.junit.jupiter.api.Test
import java.time.LocalDateTime

internal class FeeCalculatorImplTest {

private val receiverAddress = "0x0"
private val feeCalculator = FeeCalculatorImpl(receiverAddress)

@Test
fun givenTradeEventsAndOrders_whenFeeCalculated_feeActionsNotNull(): Unit = runBlocking {
val actions =
feeCalculator.createFeeActions(Valid.tradeEvent, Valid.makerOrder, Valid.takerOrder, null, null)
assertThat(actions.makerFeeAction).isNotNull
assertThat(actions.takerFeeAction).isNotNull
}

@Test
fun givenTradeEventsAndOrders_whenFeeCalculated_returnCorrectFees(): Unit = runBlocking {
val actions =
feeCalculator.createFeeActions(Valid.tradeEvent, Valid.makerOrder, Valid.takerOrder, null, null)
with(actions.makerFeeAction) {
assertThat(amount.toDouble()).isEqualTo(0.01) // 1% of 1 BTC
assertThat(symbol).isEqualTo("BTC")
assertThat(sender).isEqualTo("user_1")
assertThat(pointer).isEqualTo("order_2")
assertThat(receiver).isEqualTo(receiverAddress)
assertThat(receiverWalletType).isEqualTo("exchange")
}

with(actions.takerFeeAction) {
assertThat(amount.toDouble()).isEqualTo(500.0) // 1% of 50,000 USDT
assertThat(symbol).isEqualTo("USDT")
assertThat(sender).isEqualTo("user_2")
assertThat(pointer).isEqualTo("order_1")
assertThat(receiver).isEqualTo(receiverAddress)
assertThat(receiverWalletType).isEqualTo("exchange")
}
}

@Test
fun givenTradeEventsAndOrders_whenFAParentNotNull_thenFeeActionParentNotNull(): Unit = runBlocking {
val parentFA = FinancialAction(
null,
TradeEvent::class.java.name,
"trade_id",
"BTC_USDT",
10000.0.toBigDecimal(),
"user_parent",
"main",
"system",
"main",
Valid.currentTime
)

val actions = feeCalculator.createFeeActions(
Valid.tradeEvent,
Valid.makerOrder,
Valid.takerOrder,
parentFA,
parentFA
)
assertThat(actions.makerFeeAction.parent).isNotNull
assertThat(actions.takerFeeAction.parent).isNotNull
}

}

This file was deleted.

Loading