Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,15 +1,18 @@
package co.nilin.opex.app.bl

import co.nilin.opex.app.config.AppSchedulers
import co.nilin.opex.matching.core.eventh.EventDispatcher
import co.nilin.opex.matching.core.eventh.events.*
import co.nilin.opex.matching.core.spi.OrderBookPersister
import co.nilin.opex.port.order.kafka.service.EventsSubmitter
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.launch
import org.springframework.stereotype.Component

@Component
class ExchangeEventHandler(eventsSubmitter: EventsSubmitter)
class ExchangeEventHandler(eventsSubmitter: EventsSubmitter
, orderBookPersister: OrderBookPersister)
{
fun register(){
EventDispatcher.register(CreateOrderEvent::class.java, handler)
Expand All @@ -18,12 +21,19 @@ class ExchangeEventHandler(eventsSubmitter: EventsSubmitter)
EventDispatcher.register(RejectOrderEvent::class.java, handler)
EventDispatcher.register(SubmitOrderEvent::class.java, handler)
EventDispatcher.register(TradeEvent::class.java, handler)
EventDispatcher.register(OrderBookPublishedEvent::class.java, localHandler)
}

val handler: (CoreEvent) -> Unit = {
CoroutineScope(Dispatchers.Default).launch {
CoroutineScope(AppSchedulers.generalExecutor).launch {
eventsSubmitter.submit(it)
}
}

val localHandler: (OrderBookPublishedEvent) -> Unit = {
CoroutineScope(AppSchedulers.generalExecutor).launch {
orderBookPersister.storeLastState(it.persistentOrderBook)
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,8 @@ class AppConfig {


@Bean
fun orderListener(orderBookPersister: OrderBookPersister): OrderListener {
return OrderListener(orderBookPersister)
fun orderListener(): OrderListener {
return OrderListener()
}

@Autowired
Expand All @@ -76,8 +76,8 @@ class AppConfig {
}

@Bean
fun eventListener(orderBookPersister: OrderBookPersister): MatchingEngineEventListener {
return MatchingEngineEventListener(orderBookPersister)
fun eventListener(): MatchingEngineEventListener {
return MatchingEngineEventListener()
}

@Autowired
Expand All @@ -90,7 +90,7 @@ class AppConfig {
exchangeEventHandler.register()
}

class OrderListener(private val orderBookPersister: OrderBookPersister) : OrderSubmitRequestListener {
class OrderListener() : OrderSubmitRequestListener {

override fun id(): String {
return "OrderListener"
Expand All @@ -113,11 +113,10 @@ class AppConfig {
order.orderType
)
)
orderBookPersister.storeLastState(orderBook.persistent())
}
}

class MatchingEngineEventListener(private val orderBookPersister: OrderBookPersister) : EventListener {
class MatchingEngineEventListener() : EventListener {

private val logger = LoggerFactory.getLogger(MatchingEngineEventListener::class.java)

Expand All @@ -132,7 +131,7 @@ class AppConfig {
val orderBook = OrderBooks.lookupOrderBook("${event.pair.leftSideName}_${event.pair.rightSideName}")

when (event) {
is UpdatedOrderEvent -> orderBook.handleEditCommand(
is EditOrderRequestEvent -> orderBook.handleEditCommand(
OrderEditCommand(
event.ouid,
event.uuid,
Expand All @@ -151,9 +150,8 @@ class AppConfig {
event.pair
)
)
else -> null
}

orderBookPersister.storeLastState(orderBook.persistent())
}
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
package co.nilin.opex.matching.core.engine

import co.nilin.opex.matching.core.eventh.EventDispatcher
import co.nilin.opex.matching.core.eventh.events.OrderBookPublishedEvent
import co.nilin.opex.matching.core.inout.OrderCancelCommand
import co.nilin.opex.matching.core.inout.OrderCreateCommand
import co.nilin.opex.matching.core.inout.OrderEditCommand
import co.nilin.opex.matching.core.model.MatchConstraint
import co.nilin.opex.matching.core.model.OrderDirection
import co.nilin.opex.matching.core.model.OrderType
import co.nilin.opex.matching.core.model.PersistentOrderBook
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.launch
import org.junit.jupiter.api.Assertions
import org.junit.jupiter.api.BeforeEach
import org.junit.jupiter.api.Test
import java.util.*

class OrderBookEventEmitsUnitTest {
val pair = co.nilin.opex.matching.core.model.Pair("BTC", "USDT")
val uuid = UUID.randomUUID().toString()

var persistentOrderBook: PersistentOrderBook? = null

@BeforeEach
fun setup(){
val localHandler: (OrderBookPublishedEvent) -> Unit = {
persistentOrderBook = it.persistentOrderBook
}
EventDispatcher.register(OrderBookPublishedEvent::class.java, localHandler)
}

@Test
fun givenOrderBook_whenOrderCreated_thenOrderBookEventPublished() {
//given
val orderBook = SimpleOrderBook(pair, false)
//when
orderBook.handleNewOrderCommand(OrderCreateCommand(UUID.randomUUID().toString(), uuid, pair, 1, 1, OrderDirection.BID, MatchConstraint.GTC, OrderType.LIMIT_ORDER))
//then
Assertions.assertNotNull(persistentOrderBook)
}



@Test
fun givenOrderBook_whenCancelOrder_thenOrderBookEventPublished(){
//given
val orderBook = SimpleOrderBook(pair, false)
val firstOrderId = UUID.randomUUID().toString()
val secondOrderId = UUID.randomUUID().toString()

val firstOrder = orderBook.handleNewOrderCommand(OrderCreateCommand(firstOrderId, uuid, pair, 2, 1, OrderDirection.BID, MatchConstraint.GTC, OrderType.LIMIT_ORDER))
orderBook.handleNewOrderCommand(OrderCreateCommand(secondOrderId, uuid, pair, 1, 1, OrderDirection.BID, MatchConstraint.GTC, OrderType.LIMIT_ORDER))
persistentOrderBook = null
//when
orderBook.handleCancelCommand(OrderCancelCommand(firstOrderId, uuid, firstOrder!!.id()!!, pair))
//then
Assertions.assertNotNull(persistentOrderBook)
}


@Test
fun givenOrderBook_whenEditOrder_thenOrderBookEventPublished(){
//given
val orderBook = SimpleOrderBook(pair, false)
orderBook.handleNewOrderCommand(OrderCreateCommand(UUID.randomUUID().toString(), uuid, pair, 2, 1, OrderDirection.BID, MatchConstraint.GTC, OrderType.LIMIT_ORDER))
val secondOrder = orderBook.handleNewOrderCommand(OrderCreateCommand(UUID.randomUUID().toString(), uuid, pair, 2, 3, OrderDirection.BID, MatchConstraint.GTC, OrderType.LIMIT_ORDER))
orderBook.handleNewOrderCommand(OrderCreateCommand(UUID.randomUUID().toString(), uuid, pair, 1, 1, OrderDirection.BID, MatchConstraint.GTC, OrderType.LIMIT_ORDER))
persistentOrderBook = null
//when
orderBook.handleEditCommand(OrderEditCommand(UUID.randomUUID().toString(), uuid, secondOrder!!.id()!!, pair, 3, 2))
//then
Assertions.assertNotNull(persistentOrderBook)
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import co.nilin.opex.matching.core.eventh.EventDispatcher
import co.nilin.opex.matching.core.eventh.events.*
import co.nilin.opex.matching.core.inout.*
import co.nilin.opex.matching.core.model.*
import co.nilin.opex.matching.core.spi.OrderBookPersister
import exchange.core2.collections.art.LongAdaptiveRadixTreeMap
import org.slf4j.LoggerFactory
import java.util.*
Expand Down Expand Up @@ -127,6 +128,7 @@ class SimpleOrderBook(val pair: Pair, var replayMode: Boolean) : OrderBook {
}
}
lastOrder = order
EventDispatcher.emit(OrderBookPublishedEvent(persistent()))
logCurrentState()
return order
}
Expand Down Expand Up @@ -165,6 +167,7 @@ class SimpleOrderBook(val pair: Pair, var replayMode: Boolean) : OrderBook {
order.matchConstraint, order.orderType
))
}
EventDispatcher.emit(OrderBookPublishedEvent(persistent()))
logCurrentState()
}

Expand Down Expand Up @@ -215,6 +218,7 @@ class SimpleOrderBook(val pair: Pair, var replayMode: Boolean) : OrderBook {
if (queueOrder.filledQuantity != queueOrder.quantity) {
putGtcInQueue(queueOrder)
}
EventDispatcher.emit(OrderBookPublishedEvent(persistent()))
queueOrder
}
MatchConstraint.IOC -> {
Expand All @@ -232,6 +236,7 @@ class SimpleOrderBook(val pair: Pair, var replayMode: Boolean) : OrderBook {
))
}
}
EventDispatcher.emit(OrderBookPublishedEvent(persistent()))
queueOrder
}
else -> {
Expand All @@ -244,7 +249,7 @@ class SimpleOrderBook(val pair: Pair, var replayMode: Boolean) : OrderBook {

}

fun handleCancelOrder(order: SimpleOrder, bucketQueue: LongAdaptiveRadixTreeMap<Bucket>, bestOrder: SimpleOrder?, setBestOrder: (SimpleOrder?) -> Unit) {
private fun handleCancelOrder(order: SimpleOrder, bucketQueue: LongAdaptiveRadixTreeMap<Bucket>, bestOrder: SimpleOrder?, setBestOrder: (SimpleOrder?) -> Unit) {
val bucket = order.bucket!!
bucket.ordersCount--
bucket.totalQuantity -= order.remainedQuantity()
Expand Down Expand Up @@ -354,7 +359,7 @@ class SimpleOrderBook(val pair: Pair, var replayMode: Boolean) : OrderBook {
}


fun putGtcInQueue(order: SimpleOrder,
private fun putGtcInQueue(order: SimpleOrder,
queue: LongAdaptiveRadixTreeMap<Bucket>,
bestOrder: SimpleOrder?,
betterBucketSelector: (price: Long, queue: LongAdaptiveRadixTreeMap<Bucket>) -> Bucket?,
Expand Down Expand Up @@ -422,7 +427,7 @@ class SimpleOrderBook(val pair: Pair, var replayMode: Boolean) : OrderBook {
return lastOrder
}

override fun persistent(): PersistentOrderBook {
private fun persistent(): PersistentOrderBook {
val persistent = PersistentOrderBook(pair)
persistent.lastOrder = lastOrder?.persistent()
persistent.orders = orders.values
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,14 @@ package co.nilin.opex.matching.core.eventh
import co.nilin.opex.matching.core.eventh.events.CoreEvent
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.asCoroutineDispatcher
import kotlinx.coroutines.async
import kotlinx.coroutines.launch
import java.util.*
import java.util.concurrent.Executors
import kotlin.coroutines.suspendCoroutine

object EventDispatcher {

private val executorService = Executors.newFixedThreadPool(10).asCoroutineDispatcher()
private val eventsHandler = mutableMapOf<Class<*>, MutableList<EventListener<*>>>()

@JvmStatic
Expand All @@ -25,14 +25,12 @@ object EventDispatcher {
}


fun emit(event: CoreEvent) = CoroutineScope(executorService).launch {
fun emit(event: CoreEvent) {
var type: Class<*>? = event::class.java
while (type != null) {
eventsHandler[type]?.forEach { eventsHandler ->
suspendCoroutine {
kotlin.runCatching {
eventsHandler(event)
}
kotlin.runCatching {
eventsHandler(event)
}
}
type = type.superclass
Expand All @@ -41,7 +39,7 @@ object EventDispatcher {


open class EventListener<T>(
val lambda: (T) -> Unit
val lambda: (T) -> Unit
) {
operator fun invoke(event: Any) {
lambda(event as T)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package co.nilin.opex.matching.core.eventh.events

class EditOrderRequestEvent(): CoreEvent() , OneOrderEvent{
var ouid: String = ""
var uuid: String = ""
var orderId: Long = 0
var price: Long = 0
var quantity: Long = 0

constructor(ouid: String,
uuid: String,
orderId: Long,
pair: co.nilin.opex.matching.core.model.Pair,
price: Long,
quantity: Long,
)
: this(){
this.ouid = ouid
this.uuid = uuid
this.orderId = orderId
this.pair = pair
this.price = price
this.quantity = quantity
}

override fun ouid(): String {
return ouid
}

override fun uuid(): String {
return uuid
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package co.nilin.opex.matching.core.eventh.events

import co.nilin.opex.matching.core.model.PersistentOrderBook

data class OrderBookPublishedEvent(val persistentOrderBook: PersistentOrderBook): CoreEvent()
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,4 @@ interface OrderBook {
fun handleNewOrderCommand(orderCommand: OrderCreateCommand): Order?
fun handleCancelCommand(orderCommand: OrderCancelCommand)
fun handleEditCommand(orderCommand: OrderEditCommand): Order?
fun persistent(): PersistentOrderBook
}