Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
package co.nilin.opex.accountant.app.listener

import co.nilin.opex.accountant.core.api.OrderManager
import co.nilin.opex.accountant.ports.kafka.listener.inout.OrderSubmitRequest
import co.nilin.opex.accountant.ports.kafka.listener.inout.OrderRequestEvent
import co.nilin.opex.accountant.ports.kafka.listener.inout.OrderSubmitRequestEvent
import co.nilin.opex.accountant.ports.kafka.listener.spi.OrderSubmitRequestListener
import co.nilin.opex.matching.engine.core.eventh.events.SubmitOrderEvent
import kotlinx.coroutines.runBlocking
Expand All @@ -15,25 +16,26 @@ class OrderListener(private val orderManager: OrderManager) : OrderSubmitRequest
return "OrderListener"
}

override fun onEvent(event: OrderSubmitRequest, partition: Int, offset: Long, timestamp: Long) {
override fun onEvent(event: OrderRequestEvent, partition: Int, offset: Long, timestamp: Long) {
runBlocking {
logger.info("Order submit event received ${event.ouid}")

orderManager.handleRequestOrder(
SubmitOrderEvent(
event.ouid,
event.uuid,
event.orderId,
event.pair,
event.price,
event.quantity,
event.quantity,
event.direction,
event.matchConstraint,
event.orderType,
event.userLevel
if (event is OrderSubmitRequestEvent) {
logger.info("Order submit event received ${event.ouid}")
orderManager.handleRequestOrder(
SubmitOrderEvent(
event.ouid,
event.uuid,
event.orderId,
event.pair,
event.price,
event.quantity,
event.quantity,
event.direction,
event.matchConstraint,
event.orderType,
event.userLevel
)
)
)
}
}
}
}
4 changes: 2 additions & 2 deletions accountant/accountant-app/src/main/resources/application.yml
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
server.port: 8080
logging:
level:
co.nilin: DEBUG
reactor.netty.http.client: DEBUG
co.nilin: INFO
reactor.netty.http.client: INFO
spring:
application:
name: opex-accountant
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import co.nilin.opex.accountant.core.model.FinancialActionStatus
import co.nilin.opex.accountant.core.model.Order
import co.nilin.opex.accountant.core.spi.*
import co.nilin.opex.matching.engine.core.eventh.events.*
import co.nilin.opex.matching.engine.core.inout.RequestedOperation
import co.nilin.opex.matching.engine.core.model.OrderDirection
import org.springframework.transaction.annotation.Transactional
import java.math.BigDecimal
Expand Down Expand Up @@ -122,6 +123,9 @@ open class OrderManagerImpl(

@Transactional
override suspend fun handleRejectOrder(rejectOrderEvent: RejectOrderEvent): List<FinancialAction> {
if (rejectOrderEvent.requestedOperation != RequestedOperation.PLACE_ORDER)
return emptyList()

//order by ouid
val order = orderPersister.load(rejectOrderEvent.ouid)
if (order == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ internal class OrderManagerImplTest {
"user_1",
56,
Pair("BTC", "USDT"),
RequestedOperation.CANCEL_ORDER,
RequestedOperation.PLACE_ORDER,
RejectReason.ORDER_NOT_FOUND
)

Expand All @@ -234,6 +234,21 @@ internal class OrderManagerImplTest {
coVerify(exactly = 1) { tempEventPersister.saveTempEvent(any(), any()) }
}

@Test
fun givenRejectOrderReceived_whenOperationNotPlaceOrder_returnEmptyFA(): Unit = runBlocking {
val orderEvent = RejectOrderEvent(
"ouid",
"user_1",
56,
Pair("BTC", "USDT"),
RequestedOperation.CANCEL_ORDER,
RejectReason.ORDER_NOT_FOUND
)

val fa = orderManager.handleRejectOrder(orderEvent)
assertThat(fa.size).isEqualTo(0)
}

@Test
fun givenRejectOrderReceived_whenLocalFound_publishRichOrderUpdate(): Unit = runBlocking {
val orderEvent = RejectOrderEvent(
Expand All @@ -246,7 +261,7 @@ internal class OrderManagerImplTest {
OrderDirection.BID,
MatchConstraint.GTC,
OrderType.LIMIT_ORDER,
RequestedOperation.CANCEL_ORDER,
RequestedOperation.PLACE_ORDER,
RejectReason.ORDER_NOT_FOUND,
)
coEvery { orderPersister.load(any()) } returns Valid.order
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ class AccountantKafkaConfig {
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG to StringDeserializer::class.java,
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG to JsonDeserializer::class.java,
JsonDeserializer.TRUSTED_PACKAGES to "co.nilin.opex.*",
JsonDeserializer.TYPE_MAPPINGS to "order_request:co.nilin.opex.accountant.ports.kafka.listener.inout.OrderSubmitRequest"
JsonDeserializer.TYPE_MAPPINGS to "order_request_event:co.nilin.opex.accountant.ports.kafka.listener.inout.OrderRequestEvent,order_request_submit:co.nilin.opex.accountant.ports.kafka.listener.inout.OrderSubmitRequestEvent,order_request_cancel:co.nilin.opex.accountant.ports.kafka.listener.inout.OrderCancelRequestEvent"
)
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
package co.nilin.opex.accountant.ports.kafka.listener.consumer

import co.nilin.opex.accountant.ports.kafka.listener.inout.OrderSubmitRequest
import co.nilin.opex.accountant.ports.kafka.listener.inout.OrderRequestEvent
import co.nilin.opex.accountant.ports.kafka.listener.spi.OrderSubmitRequestListener
import org.springframework.stereotype.Component

@Component
class OrderKafkaListener : EventConsumer<OrderSubmitRequestListener, String, OrderSubmitRequest>()
class OrderKafkaListener : EventConsumer<OrderSubmitRequestListener, String, OrderRequestEvent>()
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package co.nilin.opex.accountant.ports.kafka.listener.inout

import co.nilin.opex.matching.engine.core.model.Pair

class OrderCancelRequestEvent(
ouid: String,
uuid: String,
pair: Pair,
val orderId: Long
) : OrderRequestEvent(ouid, uuid, pair)
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package co.nilin.opex.accountant.ports.kafka.listener.inout

import co.nilin.opex.matching.engine.core.model.Pair

abstract class OrderRequestEvent(val ouid:String, val uuid: String, val pair: Pair)
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,15 @@ import co.nilin.opex.matching.engine.core.model.OrderDirection
import co.nilin.opex.matching.engine.core.model.OrderType
import co.nilin.opex.matching.engine.core.model.Pair

data class OrderSubmitRequest(
val ouid: String,
val uuid: String,
val orderId: Long?,
val pair: Pair,
val price: Long = 0,
val quantity: Long = 0,
class OrderSubmitRequestEvent(
ouid: String,
uuid: String,
pair: Pair,
val price: Long,
val quantity: Long,
val direction: OrderDirection,
val matchConstraint: MatchConstraint,
val orderType: OrderType,
val userLevel: String
)
val userLevel: String,
val orderId: Long? = null,
) : OrderRequestEvent(ouid, uuid, pair)
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
package co.nilin.opex.accountant.ports.kafka.listener.spi

import co.nilin.opex.accountant.ports.kafka.listener.inout.OrderSubmitRequest
import co.nilin.opex.accountant.ports.kafka.listener.inout.OrderRequestEvent

interface OrderSubmitRequestListener : Listener<OrderSubmitRequest>
interface OrderSubmitRequestListener : Listener<OrderRequestEvent>
Original file line number Diff line number Diff line change
@@ -1,13 +1,7 @@
package co.nilin.opex.matching.engine.app.listener

import co.nilin.opex.matching.engine.app.bl.OrderBooks
import co.nilin.opex.matching.engine.core.eventh.events.CancelOrderEvent
import co.nilin.opex.matching.engine.core.eventh.events.CoreEvent
import co.nilin.opex.matching.engine.core.eventh.events.EditOrderRequestEvent
import co.nilin.opex.matching.engine.core.inout.OrderCancelCommand
import co.nilin.opex.matching.engine.core.inout.OrderEditCommand
import co.nilin.opex.matching.engine.ports.kafka.listener.spi.EventListener
import kotlinx.coroutines.runBlocking
import org.slf4j.LoggerFactory

class MatchingEngineEventListener : EventListener {
Expand All @@ -20,32 +14,5 @@ class MatchingEngineEventListener : EventListener {

override fun onEvent(event: CoreEvent, partition: Int, offset: Long, timestamp: Long) {
logger.info("Received CoreEvent: ${event::class.java}")

runBlocking {
val orderBook = OrderBooks.lookupOrderBook("${event.pair.leftSideName}_${event.pair.rightSideName}")

when (event) {
is EditOrderRequestEvent -> orderBook.handleEditCommand(
OrderEditCommand(
event.ouid,
event.uuid,
event.orderId,
event.pair,
event.price,
event.quantity
)
)

is CancelOrderEvent -> orderBook.handleCancelCommand(
OrderCancelCommand(
event.ouid,
event.uuid,
event.orderId,
event.pair
)
)
else -> null
}
}
}
}
Original file line number Diff line number Diff line change
@@ -1,32 +1,49 @@
package co.nilin.opex.matching.engine.app.listener

import co.nilin.opex.matching.engine.app.bl.OrderBooks
import co.nilin.opex.matching.engine.core.inout.OrderCreateCommand
import co.nilin.opex.matching.engine.core.inout.OrderSubmitRequest
import co.nilin.opex.matching.engine.ports.kafka.listener.spi.OrderSubmitRequestListener
import co.nilin.opex.matching.engine.core.inout.*
import co.nilin.opex.matching.engine.ports.kafka.listener.spi.OrderRequestEventListener
import org.slf4j.LoggerFactory

class OrderListener : OrderSubmitRequestListener {
class OrderListener : OrderRequestEventListener {

private val logger = LoggerFactory.getLogger(OrderListener::class.java)

override fun id(): String {
return "OrderListener"
}

override suspend fun onOrder(order: OrderSubmitRequest, partition: Int, offset: Long, timestamp: Long) {
override suspend fun onOrder(order: OrderRequestEvent, partition: Int, offset: Long, timestamp: Long) {
logger.info("OrderRequestEvent received. ${order::class.java.simpleName} ouid=${order.ouid}")
val orderBook = OrderBooks.lookupOrderBook(
order.pair.leftSideName + "_"
+ order.pair.rightSideName
)
orderBook.handleNewOrderCommand(
OrderCreateCommand(
order.ouid,
order.uuid,
order.pair,
order.price,
order.quantity,
order.direction,
order.matchConstraint,
order.orderType

when (order) {
is OrderSubmitRequestEvent -> orderBook.handleNewOrderCommand(
OrderCreateCommand(
order.ouid,
order.uuid,
order.pair,
order.price,
order.quantity,
order.direction,
order.matchConstraint,
order.orderType
)
)
)

is OrderCancelRequestEvent -> orderBook.handleCancelCommand(
OrderCancelCommand(
order.ouid,
order.uuid,
order.orderId,
order.pair
)
)

else -> logger.warn("Unknown event type of OrderRequestEvent")
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package co.nilin.opex.matching.engine.core.inout

import co.nilin.opex.matching.engine.core.model.Pair

class OrderCancelRequestEvent(
ouid: String,
uuid: String,
pair: Pair,
val orderId: Long
) : OrderRequestEvent(ouid, uuid, pair)
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package co.nilin.opex.matching.engine.core.inout

import co.nilin.opex.matching.engine.core.model.Pair

abstract class OrderRequestEvent(val ouid:String, val uuid: String, val pair: Pair)

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package co.nilin.opex.matching.engine.core.inout

import co.nilin.opex.matching.engine.core.model.MatchConstraint
import co.nilin.opex.matching.engine.core.model.OrderDirection
import co.nilin.opex.matching.engine.core.model.OrderType
import co.nilin.opex.matching.engine.core.model.Pair

class OrderSubmitRequestEvent(
ouid: String,
uuid: String,
pair: Pair,
val price: Long,
val quantity: Long,
val direction: OrderDirection,
val matchConstraint: MatchConstraint,
val orderType: OrderType,
val userLevel: String,
val orderId: Long? = null,
) : OrderRequestEvent(ouid, uuid, pair)
Loading