From 34a033bb701ccd90117960e139cbf94789516c65 Mon Sep 17 00:00:00 2001 From: Peyman Date: Wed, 7 Jun 2023 16:32:37 +0330 Subject: [PATCH 1/4] Fix order request event --- .../listener/MatchingEngineEventListener.kt | 33 ------- .../engine/app/listener/OrderListener.kt | 49 +++++++---- .../core/inout/OrderCancelRequestEvent.kt | 10 +++ .../engine/core/inout/OrderRequestEvent.kt | 5 ++ .../engine/core/inout/OrderSubmitRequest.kt | 19 ---- .../core/inout/OrderSubmitRequestEvent.kt | 19 ++++ .../kafka/listener/config/OrderKafkaConfig.kt | 11 +-- .../listener/consumer/OrderKafkaListener.kt | 16 ++-- .../listener/spi/OrderRequestEventListener.kt | 8 ++ .../spi/OrderSubmitRequestListener.kt | 8 -- .../submitter/config/EventsKafkaConfig.kt | 7 +- .../gateway/app/service/OrderService.kt | 18 ++-- .../gateway/app/service/OrderServiceTest.kt | 10 +-- .../submitter/config/OrderKafkaConfig.kt | 7 +- .../inout/OrderCancelRequestEvent.kt | 10 +++ .../submitter/inout/OrderRequestEvent.kt | 5 ++ ...tRequest.kt => OrderSubmitRequestEvent.kt} | 9 +- ...itter.kt => OrderRequestEventSubmitter.kt} | 10 +-- preferences-dev.yml | 86 ++++++++++++++----- 19 files changed, 201 insertions(+), 139 deletions(-) create mode 100644 matching-engine/matching-engine-core/src/main/kotlin/co/nilin/opex/matching/engine/core/inout/OrderCancelRequestEvent.kt create mode 100644 matching-engine/matching-engine-core/src/main/kotlin/co/nilin/opex/matching/engine/core/inout/OrderRequestEvent.kt delete mode 100644 matching-engine/matching-engine-core/src/main/kotlin/co/nilin/opex/matching/engine/core/inout/OrderSubmitRequest.kt create mode 100644 matching-engine/matching-engine-core/src/main/kotlin/co/nilin/opex/matching/engine/core/inout/OrderSubmitRequestEvent.kt create mode 100644 matching-engine/matching-engine-ports/matching-engine-eventlistener-kafka/src/main/kotlin/co/nilin/opex/matching/engine/ports/kafka/listener/spi/OrderRequestEventListener.kt delete mode 100644 matching-engine/matching-engine-ports/matching-engine-eventlistener-kafka/src/main/kotlin/co/nilin/opex/matching/engine/ports/kafka/listener/spi/OrderSubmitRequestListener.kt create mode 100644 matching-gateway/matching-gateway-port/matching-gateway-submitter-kafka/src/main/kotlin/co/nilin/opex/matching/gateway/ports/kafka/submitter/inout/OrderCancelRequestEvent.kt create mode 100644 matching-gateway/matching-gateway-port/matching-gateway-submitter-kafka/src/main/kotlin/co/nilin/opex/matching/gateway/ports/kafka/submitter/inout/OrderRequestEvent.kt rename matching-gateway/matching-gateway-port/matching-gateway-submitter-kafka/src/main/kotlin/co/nilin/opex/matching/gateway/ports/kafka/submitter/inout/{OrderSubmitRequest.kt => OrderSubmitRequestEvent.kt} (80%) rename matching-gateway/matching-gateway-port/matching-gateway-submitter-kafka/src/main/kotlin/co/nilin/opex/matching/gateway/ports/kafka/submitter/service/{OrderSubmitter.kt => OrderRequestEventSubmitter.kt} (69%) diff --git a/matching-engine/matching-engine-app/src/main/kotlin/co/nilin/opex/matching/engine/app/listener/MatchingEngineEventListener.kt b/matching-engine/matching-engine-app/src/main/kotlin/co/nilin/opex/matching/engine/app/listener/MatchingEngineEventListener.kt index 0725120d0..682e7560d 100644 --- a/matching-engine/matching-engine-app/src/main/kotlin/co/nilin/opex/matching/engine/app/listener/MatchingEngineEventListener.kt +++ b/matching-engine/matching-engine-app/src/main/kotlin/co/nilin/opex/matching/engine/app/listener/MatchingEngineEventListener.kt @@ -1,13 +1,7 @@ package co.nilin.opex.matching.engine.app.listener -import co.nilin.opex.matching.engine.app.bl.OrderBooks -import co.nilin.opex.matching.engine.core.eventh.events.CancelOrderEvent import co.nilin.opex.matching.engine.core.eventh.events.CoreEvent -import co.nilin.opex.matching.engine.core.eventh.events.EditOrderRequestEvent -import co.nilin.opex.matching.engine.core.inout.OrderCancelCommand -import co.nilin.opex.matching.engine.core.inout.OrderEditCommand import co.nilin.opex.matching.engine.ports.kafka.listener.spi.EventListener -import kotlinx.coroutines.runBlocking import org.slf4j.LoggerFactory class MatchingEngineEventListener : EventListener { @@ -20,32 +14,5 @@ class MatchingEngineEventListener : EventListener { override fun onEvent(event: CoreEvent, partition: Int, offset: Long, timestamp: Long) { logger.info("Received CoreEvent: ${event::class.java}") - - runBlocking { - val orderBook = OrderBooks.lookupOrderBook("${event.pair.leftSideName}_${event.pair.rightSideName}") - - when (event) { - is EditOrderRequestEvent -> orderBook.handleEditCommand( - OrderEditCommand( - event.ouid, - event.uuid, - event.orderId, - event.pair, - event.price, - event.quantity - ) - ) - - is CancelOrderEvent -> orderBook.handleCancelCommand( - OrderCancelCommand( - event.ouid, - event.uuid, - event.orderId, - event.pair - ) - ) - else -> null - } - } } } \ No newline at end of file diff --git a/matching-engine/matching-engine-app/src/main/kotlin/co/nilin/opex/matching/engine/app/listener/OrderListener.kt b/matching-engine/matching-engine-app/src/main/kotlin/co/nilin/opex/matching/engine/app/listener/OrderListener.kt index dc55ef4f2..b19ab8291 100644 --- a/matching-engine/matching-engine-app/src/main/kotlin/co/nilin/opex/matching/engine/app/listener/OrderListener.kt +++ b/matching-engine/matching-engine-app/src/main/kotlin/co/nilin/opex/matching/engine/app/listener/OrderListener.kt @@ -1,32 +1,49 @@ package co.nilin.opex.matching.engine.app.listener import co.nilin.opex.matching.engine.app.bl.OrderBooks -import co.nilin.opex.matching.engine.core.inout.OrderCreateCommand -import co.nilin.opex.matching.engine.core.inout.OrderSubmitRequest -import co.nilin.opex.matching.engine.ports.kafka.listener.spi.OrderSubmitRequestListener +import co.nilin.opex.matching.engine.core.inout.* +import co.nilin.opex.matching.engine.ports.kafka.listener.spi.OrderRequestEventListener +import org.slf4j.LoggerFactory -class OrderListener : OrderSubmitRequestListener { +class OrderListener : OrderRequestEventListener { + + private val logger = LoggerFactory.getLogger(OrderListener::class.java) override fun id(): String { return "OrderListener" } - override suspend fun onOrder(order: OrderSubmitRequest, partition: Int, offset: Long, timestamp: Long) { + override suspend fun onOrder(order: OrderRequestEvent, partition: Int, offset: Long, timestamp: Long) { + logger.info("OrderRequestEvent received. ${order::class.java.simpleName} ouid=${order.ouid}") val orderBook = OrderBooks.lookupOrderBook( order.pair.leftSideName + "_" + order.pair.rightSideName ) - orderBook.handleNewOrderCommand( - OrderCreateCommand( - order.ouid, - order.uuid, - order.pair, - order.price, - order.quantity, - order.direction, - order.matchConstraint, - order.orderType + + when (order) { + is OrderSubmitRequestEvent -> orderBook.handleNewOrderCommand( + OrderCreateCommand( + order.ouid, + order.uuid, + order.pair, + order.price, + order.quantity, + order.direction, + order.matchConstraint, + order.orderType + ) ) - ) + + is OrderCancelRequestEvent -> orderBook.handleCancelCommand( + OrderCancelCommand( + order.ouid, + order.uuid, + order.orderId, + order.pair + ) + ) + + else -> logger.warn("Unknown event type of OrderRequestEvent") + } } } \ No newline at end of file diff --git a/matching-engine/matching-engine-core/src/main/kotlin/co/nilin/opex/matching/engine/core/inout/OrderCancelRequestEvent.kt b/matching-engine/matching-engine-core/src/main/kotlin/co/nilin/opex/matching/engine/core/inout/OrderCancelRequestEvent.kt new file mode 100644 index 000000000..50cd7da61 --- /dev/null +++ b/matching-engine/matching-engine-core/src/main/kotlin/co/nilin/opex/matching/engine/core/inout/OrderCancelRequestEvent.kt @@ -0,0 +1,10 @@ +package co.nilin.opex.matching.engine.core.inout + +import co.nilin.opex.matching.engine.core.model.Pair + +class OrderCancelRequestEvent( + ouid: String, + uuid: String, + pair: Pair, + val orderId: Long +) : OrderRequestEvent(ouid, uuid, pair) \ No newline at end of file diff --git a/matching-engine/matching-engine-core/src/main/kotlin/co/nilin/opex/matching/engine/core/inout/OrderRequestEvent.kt b/matching-engine/matching-engine-core/src/main/kotlin/co/nilin/opex/matching/engine/core/inout/OrderRequestEvent.kt new file mode 100644 index 000000000..3ec793a34 --- /dev/null +++ b/matching-engine/matching-engine-core/src/main/kotlin/co/nilin/opex/matching/engine/core/inout/OrderRequestEvent.kt @@ -0,0 +1,5 @@ +package co.nilin.opex.matching.engine.core.inout + +import co.nilin.opex.matching.engine.core.model.Pair + +abstract class OrderRequestEvent(val ouid:String, val uuid: String, val pair: Pair) \ No newline at end of file diff --git a/matching-engine/matching-engine-core/src/main/kotlin/co/nilin/opex/matching/engine/core/inout/OrderSubmitRequest.kt b/matching-engine/matching-engine-core/src/main/kotlin/co/nilin/opex/matching/engine/core/inout/OrderSubmitRequest.kt deleted file mode 100644 index e8c0e2fb2..000000000 --- a/matching-engine/matching-engine-core/src/main/kotlin/co/nilin/opex/matching/engine/core/inout/OrderSubmitRequest.kt +++ /dev/null @@ -1,19 +0,0 @@ -package co.nilin.opex.matching.engine.core.inout - -import co.nilin.opex.matching.engine.core.model.MatchConstraint -import co.nilin.opex.matching.engine.core.model.OrderDirection -import co.nilin.opex.matching.engine.core.model.OrderType -import co.nilin.opex.matching.engine.core.model.Pair - -class OrderSubmitRequest( - var ouid: String, - var uuid: String, - var pair: Pair, - var orderId: Long? = null, - var price: Long = 0, - var quantity: Long = 0, - var direction: OrderDirection = OrderDirection.BID, - var matchConstraint: MatchConstraint = MatchConstraint.GTC, - var orderType: OrderType = OrderType.LIMIT_ORDER, - var userLevel: String = "" -) \ No newline at end of file diff --git a/matching-engine/matching-engine-core/src/main/kotlin/co/nilin/opex/matching/engine/core/inout/OrderSubmitRequestEvent.kt b/matching-engine/matching-engine-core/src/main/kotlin/co/nilin/opex/matching/engine/core/inout/OrderSubmitRequestEvent.kt new file mode 100644 index 000000000..96d3e03a1 --- /dev/null +++ b/matching-engine/matching-engine-core/src/main/kotlin/co/nilin/opex/matching/engine/core/inout/OrderSubmitRequestEvent.kt @@ -0,0 +1,19 @@ +package co.nilin.opex.matching.engine.core.inout + +import co.nilin.opex.matching.engine.core.model.MatchConstraint +import co.nilin.opex.matching.engine.core.model.OrderDirection +import co.nilin.opex.matching.engine.core.model.OrderType +import co.nilin.opex.matching.engine.core.model.Pair + +class OrderSubmitRequestEvent( + ouid: String, + uuid: String, + pair: Pair, + val price: Long, + val quantity: Long, + val direction: OrderDirection, + val matchConstraint: MatchConstraint, + val orderType: OrderType, + val userLevel: String, + val orderId: Long? = null, +) : OrderRequestEvent(ouid, uuid, pair) \ No newline at end of file diff --git a/matching-engine/matching-engine-ports/matching-engine-eventlistener-kafka/src/main/kotlin/co/nilin/opex/matching/engine/ports/kafka/listener/config/OrderKafkaConfig.kt b/matching-engine/matching-engine-ports/matching-engine-eventlistener-kafka/src/main/kotlin/co/nilin/opex/matching/engine/ports/kafka/listener/config/OrderKafkaConfig.kt index 2817f2c00..0cb4147f5 100644 --- a/matching-engine/matching-engine-ports/matching-engine-eventlistener-kafka/src/main/kotlin/co/nilin/opex/matching/engine/ports/kafka/listener/config/OrderKafkaConfig.kt +++ b/matching-engine/matching-engine-ports/matching-engine-eventlistener-kafka/src/main/kotlin/co/nilin/opex/matching/engine/ports/kafka/listener/config/OrderKafkaConfig.kt @@ -1,7 +1,8 @@ package co.nilin.opex.matching.engine.ports.kafka.listener.config import co.nilin.opex.matching.engine.core.eventh.events.CoreEvent -import co.nilin.opex.matching.engine.core.inout.OrderSubmitRequest +import co.nilin.opex.matching.engine.core.inout.OrderRequestEvent +import co.nilin.opex.matching.engine.core.inout.OrderSubmitRequestEvent import co.nilin.opex.matching.engine.ports.kafka.listener.consumer.EventKafkaListener import co.nilin.opex.matching.engine.ports.kafka.listener.consumer.OrderKafkaListener import org.apache.kafka.clients.consumer.ConsumerConfig @@ -40,12 +41,12 @@ class OrderKafkaConfig { ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG to StringDeserializer::class.java, ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG to JsonDeserializer::class.java, JsonDeserializer.TRUSTED_PACKAGES to "co.nilin.opex.*", - JsonDeserializer.TYPE_MAPPINGS to "order_request:co.nilin.opex.matching.engine.core.inout.OrderSubmitRequest" + JsonDeserializer.TYPE_MAPPINGS to "order_request_event=co.nilin.opex.matching.engine.core.inout.OrderRequestEvent,order_request_submit:co.nilin.opex.matching.engine.core.inout.OrderSubmitRequestEvent,order_request_cancel:co.nilin.opex.matching.engine.core.inout.OrderCancelRequestEvent" ) } @Bean("orderConsumerFactory") - fun consumerFactory(@Qualifier("consumerConfigs") consumerConfigs: Map): ConsumerFactory { + fun consumerFactory(@Qualifier("consumerConfigs") consumerConfigs: Map): ConsumerFactory { return DefaultKafkaConsumerFactory(consumerConfigs) } @@ -57,8 +58,8 @@ class OrderKafkaConfig { @Autowired fun configureListener( orderKafkaListener: OrderKafkaListener, - @Qualifier("orderKafkaTemplate") template: KafkaTemplate, - @Qualifier("orderConsumerFactory") consumerFactory: ConsumerFactory + @Qualifier("orderKafkaTemplate") template: KafkaTemplate, + @Qualifier("orderConsumerFactory") consumerFactory: ConsumerFactory ) { val topics = symbols.map { s -> "orders_$s" }.toTypedArray() val containerProps = ContainerProperties(*topics) diff --git a/matching-engine/matching-engine-ports/matching-engine-eventlistener-kafka/src/main/kotlin/co/nilin/opex/matching/engine/ports/kafka/listener/consumer/OrderKafkaListener.kt b/matching-engine/matching-engine-ports/matching-engine-eventlistener-kafka/src/main/kotlin/co/nilin/opex/matching/engine/ports/kafka/listener/consumer/OrderKafkaListener.kt index 69a6327d2..16fd32a75 100644 --- a/matching-engine/matching-engine-ports/matching-engine-eventlistener-kafka/src/main/kotlin/co/nilin/opex/matching/engine/ports/kafka/listener/consumer/OrderKafkaListener.kt +++ b/matching-engine/matching-engine-ports/matching-engine-eventlistener-kafka/src/main/kotlin/co/nilin/opex/matching/engine/ports/kafka/listener/consumer/OrderKafkaListener.kt @@ -1,16 +1,18 @@ package co.nilin.opex.matching.engine.ports.kafka.listener.consumer -import co.nilin.opex.matching.engine.core.inout.OrderSubmitRequest -import co.nilin.opex.matching.engine.ports.kafka.listener.spi.OrderSubmitRequestListener +import co.nilin.opex.matching.engine.core.inout.OrderRequestEvent +import co.nilin.opex.matching.engine.ports.kafka.listener.spi.OrderRequestEventListener import kotlinx.coroutines.runBlocking import org.apache.kafka.clients.consumer.ConsumerRecord import org.springframework.kafka.listener.MessageListener import org.springframework.stereotype.Component @Component -class OrderKafkaListener : MessageListener { - val orderListeners = arrayListOf() - override fun onMessage(data: ConsumerRecord) { +class OrderKafkaListener : MessageListener { + + val orderListeners = arrayListOf() + + override fun onMessage(data: ConsumerRecord) { orderListeners.forEach { tl -> runBlocking { tl.onOrder(data.value(), data.partition(), data.offset(), data.timestamp()) @@ -18,11 +20,11 @@ class OrderKafkaListener : MessageListener { } } - fun addOrderListener(tl: OrderSubmitRequestListener) { + fun addOrderListener(tl: OrderRequestEventListener) { orderListeners.add(tl) } - fun removeOrderListener(tl: OrderSubmitRequestListener) { + fun removeOrderListener(tl: OrderRequestEventListener) { orderListeners.removeIf { item -> item.id() == tl.id() } diff --git a/matching-engine/matching-engine-ports/matching-engine-eventlistener-kafka/src/main/kotlin/co/nilin/opex/matching/engine/ports/kafka/listener/spi/OrderRequestEventListener.kt b/matching-engine/matching-engine-ports/matching-engine-eventlistener-kafka/src/main/kotlin/co/nilin/opex/matching/engine/ports/kafka/listener/spi/OrderRequestEventListener.kt new file mode 100644 index 000000000..32342e348 --- /dev/null +++ b/matching-engine/matching-engine-ports/matching-engine-eventlistener-kafka/src/main/kotlin/co/nilin/opex/matching/engine/ports/kafka/listener/spi/OrderRequestEventListener.kt @@ -0,0 +1,8 @@ +package co.nilin.opex.matching.engine.ports.kafka.listener.spi + +import co.nilin.opex.matching.engine.core.inout.OrderRequestEvent + +interface OrderRequestEventListener { + fun id(): String + suspend fun onOrder(order: OrderRequestEvent, partition: Int, offset: Long, timestamp: Long) +} \ No newline at end of file diff --git a/matching-engine/matching-engine-ports/matching-engine-eventlistener-kafka/src/main/kotlin/co/nilin/opex/matching/engine/ports/kafka/listener/spi/OrderSubmitRequestListener.kt b/matching-engine/matching-engine-ports/matching-engine-eventlistener-kafka/src/main/kotlin/co/nilin/opex/matching/engine/ports/kafka/listener/spi/OrderSubmitRequestListener.kt deleted file mode 100644 index e2268cef6..000000000 --- a/matching-engine/matching-engine-ports/matching-engine-eventlistener-kafka/src/main/kotlin/co/nilin/opex/matching/engine/ports/kafka/listener/spi/OrderSubmitRequestListener.kt +++ /dev/null @@ -1,8 +0,0 @@ -package co.nilin.opex.matching.engine.ports.kafka.listener.spi - -import co.nilin.opex.matching.engine.core.inout.OrderSubmitRequest - -interface OrderSubmitRequestListener { - fun id(): String - suspend fun onOrder(order: OrderSubmitRequest, partition: Int, offset: Long, timestamp: Long) -} \ No newline at end of file diff --git a/matching-engine/matching-engine-ports/matching-engine-submitter-kafka/src/main/kotlin/co/nilin/opex/matching/engine/ports/kafka/submitter/config/EventsKafkaConfig.kt b/matching-engine/matching-engine-ports/matching-engine-submitter-kafka/src/main/kotlin/co/nilin/opex/matching/engine/ports/kafka/submitter/config/EventsKafkaConfig.kt index 53828b3b3..6fdf44e04 100644 --- a/matching-engine/matching-engine-ports/matching-engine-submitter-kafka/src/main/kotlin/co/nilin/opex/matching/engine/ports/kafka/submitter/config/EventsKafkaConfig.kt +++ b/matching-engine/matching-engine-ports/matching-engine-submitter-kafka/src/main/kotlin/co/nilin/opex/matching/engine/ports/kafka/submitter/config/EventsKafkaConfig.kt @@ -1,7 +1,8 @@ package co.nilin.opex.matching.engine.ports.kafka.submitter.config import co.nilin.opex.matching.engine.core.eventh.events.CoreEvent -import co.nilin.opex.matching.engine.core.inout.OrderSubmitRequest +import co.nilin.opex.matching.engine.core.inout.OrderRequestEvent +import co.nilin.opex.matching.engine.core.inout.OrderSubmitRequestEvent import org.apache.kafka.clients.producer.ProducerConfig import org.apache.kafka.common.serialization.StringSerializer import org.springframework.beans.factory.annotation.Qualifier @@ -40,12 +41,12 @@ class EventsKafkaConfig { } @Bean("orderProducerFactory") - fun orderProducerFactory(@Qualifier("producerConfigs") producerConfigs: Map): ProducerFactory { + fun orderProducerFactory(@Qualifier("producerConfigs") producerConfigs: Map): ProducerFactory { return DefaultKafkaProducerFactory(producerConfigs) } @Bean("orderKafkaTemplate") - fun orderKafkaTemplate(@Qualifier("orderProducerFactory") producerFactory: ProducerFactory): KafkaTemplate { + fun orderKafkaTemplate(@Qualifier("orderProducerFactory") producerFactory: ProducerFactory): KafkaTemplate { return KafkaTemplate(producerFactory) } diff --git a/matching-gateway/matching-gateway-app/src/main/kotlin/co/nilin/opex/matching/gateway/app/service/OrderService.kt b/matching-gateway/matching-gateway-app/src/main/kotlin/co/nilin/opex/matching/gateway/app/service/OrderService.kt index 05b2f0fa5..432e778c1 100644 --- a/matching-gateway/matching-gateway-app/src/main/kotlin/co/nilin/opex/matching/gateway/app/service/OrderService.kt +++ b/matching-gateway/matching-gateway-app/src/main/kotlin/co/nilin/opex/matching/gateway/app/service/OrderService.kt @@ -1,17 +1,16 @@ package co.nilin.opex.matching.gateway.app.service -import co.nilin.opex.matching.engine.core.eventh.events.CancelOrderEvent import co.nilin.opex.matching.engine.core.model.OrderDirection import co.nilin.opex.matching.engine.core.model.Pair import co.nilin.opex.matching.gateway.app.inout.CancelOrderRequest import co.nilin.opex.matching.gateway.app.inout.CreateOrderRequest import co.nilin.opex.matching.gateway.app.spi.AccountantApiProxy import co.nilin.opex.matching.gateway.app.spi.PairConfigLoader -import co.nilin.opex.matching.gateway.ports.kafka.submitter.inout.OrderSubmitRequest +import co.nilin.opex.matching.gateway.ports.kafka.submitter.inout.OrderCancelRequestEvent +import co.nilin.opex.matching.gateway.ports.kafka.submitter.inout.OrderSubmitRequestEvent import co.nilin.opex.matching.gateway.ports.kafka.submitter.inout.OrderSubmitResult -import co.nilin.opex.matching.gateway.ports.kafka.submitter.service.EventSubmitter import co.nilin.opex.matching.gateway.ports.kafka.submitter.service.KafkaHealthIndicator -import co.nilin.opex.matching.gateway.ports.kafka.submitter.service.OrderSubmitter +import co.nilin.opex.matching.gateway.ports.kafka.submitter.service.OrderRequestEventSubmitter import co.nilin.opex.utility.error.data.OpexError import co.nilin.opex.utility.error.data.OpexException import org.slf4j.LoggerFactory @@ -21,8 +20,7 @@ import java.math.BigDecimal @Service class OrderService( val accountantApiProxy: AccountantApiProxy, - val orderSubmitter: OrderSubmitter, - val eventSubmitter: EventSubmitter, + val orderRequestEventSubmitter: OrderRequestEventSubmitter, val pairConfigLoader: PairConfigLoader, private val kafkaHealthIndicator: KafkaHealthIndicator, ) { @@ -55,7 +53,7 @@ class OrderService( if (!kafkaHealthIndicator.isHealthy) throw OpexException(OpexError.ServiceUnavailable) - val orderSubmitRequest = OrderSubmitRequest( + val orderSubmitRequest = OrderSubmitRequestEvent( createOrderRequest.uuid!!, //get from auth2 Pair(symbolSides[0], symbolSides[1]), createOrderRequest.price @@ -69,12 +67,12 @@ class OrderService( createOrderRequest.orderType, createOrderRequest.userLevel ) - return orderSubmitter.submit(orderSubmitRequest) + return orderRequestEventSubmitter.submit(orderSubmitRequest) } suspend fun cancelOrder(request: CancelOrderRequest): OrderSubmitResult { val symbols = request.symbol.split("_") - val event = CancelOrderEvent(request.ouid, request.uuid, request.orderId, Pair(symbols[0], symbols[1])) - return eventSubmitter.submit(event) + val event = OrderCancelRequestEvent(request.ouid, request.uuid, Pair(symbols[0], symbols[1]), request.orderId) + return orderRequestEventSubmitter.submit(event) } } diff --git a/matching-gateway/matching-gateway-app/src/test/kotlin/co/nilin/opex/matching/gateway/app/service/OrderServiceTest.kt b/matching-gateway/matching-gateway-app/src/test/kotlin/co/nilin/opex/matching/gateway/app/service/OrderServiceTest.kt index 7c09ca47b..b1f25539d 100644 --- a/matching-gateway/matching-gateway-app/src/test/kotlin/co/nilin/opex/matching/gateway/app/service/OrderServiceTest.kt +++ b/matching-gateway/matching-gateway-app/src/test/kotlin/co/nilin/opex/matching/gateway/app/service/OrderServiceTest.kt @@ -7,7 +7,7 @@ import co.nilin.opex.matching.gateway.app.spi.PairConfigLoader import co.nilin.opex.matching.gateway.ports.kafka.submitter.inout.OrderSubmitResult import co.nilin.opex.matching.gateway.ports.kafka.submitter.service.EventSubmitter import co.nilin.opex.matching.gateway.ports.kafka.submitter.service.KafkaHealthIndicator -import co.nilin.opex.matching.gateway.ports.kafka.submitter.service.OrderSubmitter +import co.nilin.opex.matching.gateway.ports.kafka.submitter.service.OrderRequestEventSubmitter import io.mockk.* import kotlinx.coroutines.runBlocking import org.assertj.core.api.Assertions.assertThat @@ -17,13 +17,13 @@ import java.math.BigDecimal private class OrderServiceTest { private val accountantApiProxy: AccountantApiProxy = mockk() - private val orderSubmitter: OrderSubmitter = mockk() + private val orderRequestEventSubmitter: OrderRequestEventSubmitter = mockk() private val eventSubmitter: EventSubmitter = mockk() private val pairConfigLoader: PairConfigLoader = mockk() private val kafkaHealthIndicator: KafkaHealthIndicator = mockk() private val orderService: OrderService = OrderService( accountantApiProxy, - orderSubmitter, + orderRequestEventSubmitter, eventSubmitter, pairConfigLoader, kafkaHealthIndicator @@ -44,7 +44,7 @@ private class OrderServiceTest { ) } returns true coEvery { - orderSubmitter.submit(any()) + orderRequestEventSubmitter.submit(any()) } returns OrderSubmitResult(null) coEvery { kafkaHealthIndicator.isHealthy @@ -66,7 +66,7 @@ private class OrderServiceTest { ) } returns true coEvery { - orderSubmitter.submit(any()) + orderRequestEventSubmitter.submit(any()) } returns OrderSubmitResult(null) coEvery { kafkaHealthIndicator.isHealthy diff --git a/matching-gateway/matching-gateway-port/matching-gateway-submitter-kafka/src/main/kotlin/co/nilin/opex/matching/gateway/ports/kafka/submitter/config/OrderKafkaConfig.kt b/matching-gateway/matching-gateway-port/matching-gateway-submitter-kafka/src/main/kotlin/co/nilin/opex/matching/gateway/ports/kafka/submitter/config/OrderKafkaConfig.kt index a8dab0441..faad92c36 100644 --- a/matching-gateway/matching-gateway-port/matching-gateway-submitter-kafka/src/main/kotlin/co/nilin/opex/matching/gateway/ports/kafka/submitter/config/OrderKafkaConfig.kt +++ b/matching-gateway/matching-gateway-port/matching-gateway-submitter-kafka/src/main/kotlin/co/nilin/opex/matching/gateway/ports/kafka/submitter/config/OrderKafkaConfig.kt @@ -1,7 +1,8 @@ package co.nilin.opex.matching.gateway.ports.kafka.submitter.config import co.nilin.opex.matching.engine.core.eventh.events.CoreEvent -import co.nilin.opex.matching.gateway.ports.kafka.submitter.inout.OrderSubmitRequest +import co.nilin.opex.matching.gateway.ports.kafka.submitter.inout.OrderRequestEvent +import co.nilin.opex.matching.gateway.ports.kafka.submitter.inout.OrderSubmitRequestEvent import org.apache.kafka.clients.producer.ProducerConfig import org.apache.kafka.common.serialization.StringSerializer import org.springframework.beans.factory.annotation.Qualifier @@ -31,12 +32,12 @@ class OrderKafkaConfig { } @Bean("orderProducerFactory") - fun producerFactory(@Qualifier("orderProducerConfigs") producerConfigs: Map): ProducerFactory { + fun producerFactory(@Qualifier("orderProducerConfigs") producerConfigs: Map): ProducerFactory { return DefaultKafkaProducerFactory(producerConfigs) } @Bean("orderKafkaTemplate") - fun kafkaTemplate(@Qualifier("orderProducerFactory") producerFactory: ProducerFactory): KafkaTemplate { + fun kafkaTemplate(@Qualifier("orderProducerFactory") producerFactory: ProducerFactory): KafkaTemplate { return KafkaTemplate(producerFactory) } diff --git a/matching-gateway/matching-gateway-port/matching-gateway-submitter-kafka/src/main/kotlin/co/nilin/opex/matching/gateway/ports/kafka/submitter/inout/OrderCancelRequestEvent.kt b/matching-gateway/matching-gateway-port/matching-gateway-submitter-kafka/src/main/kotlin/co/nilin/opex/matching/gateway/ports/kafka/submitter/inout/OrderCancelRequestEvent.kt new file mode 100644 index 000000000..c9b467a20 --- /dev/null +++ b/matching-gateway/matching-gateway-port/matching-gateway-submitter-kafka/src/main/kotlin/co/nilin/opex/matching/gateway/ports/kafka/submitter/inout/OrderCancelRequestEvent.kt @@ -0,0 +1,10 @@ +package co.nilin.opex.matching.gateway.ports.kafka.submitter.inout + +import co.nilin.opex.matching.engine.core.model.Pair + +class OrderCancelRequestEvent( + ouid: String, + uuid: String, + pair: Pair, + val orderId: Long +) : OrderRequestEvent(ouid, uuid, pair) \ No newline at end of file diff --git a/matching-gateway/matching-gateway-port/matching-gateway-submitter-kafka/src/main/kotlin/co/nilin/opex/matching/gateway/ports/kafka/submitter/inout/OrderRequestEvent.kt b/matching-gateway/matching-gateway-port/matching-gateway-submitter-kafka/src/main/kotlin/co/nilin/opex/matching/gateway/ports/kafka/submitter/inout/OrderRequestEvent.kt new file mode 100644 index 000000000..462f2f58a --- /dev/null +++ b/matching-gateway/matching-gateway-port/matching-gateway-submitter-kafka/src/main/kotlin/co/nilin/opex/matching/gateway/ports/kafka/submitter/inout/OrderRequestEvent.kt @@ -0,0 +1,5 @@ +package co.nilin.opex.matching.gateway.ports.kafka.submitter.inout + +import co.nilin.opex.matching.engine.core.model.Pair + +abstract class OrderRequestEvent(val ouid:String, val uuid: String, val pair: Pair) \ No newline at end of file diff --git a/matching-gateway/matching-gateway-port/matching-gateway-submitter-kafka/src/main/kotlin/co/nilin/opex/matching/gateway/ports/kafka/submitter/inout/OrderSubmitRequest.kt b/matching-gateway/matching-gateway-port/matching-gateway-submitter-kafka/src/main/kotlin/co/nilin/opex/matching/gateway/ports/kafka/submitter/inout/OrderSubmitRequestEvent.kt similarity index 80% rename from matching-gateway/matching-gateway-port/matching-gateway-submitter-kafka/src/main/kotlin/co/nilin/opex/matching/gateway/ports/kafka/submitter/inout/OrderSubmitRequest.kt rename to matching-gateway/matching-gateway-port/matching-gateway-submitter-kafka/src/main/kotlin/co/nilin/opex/matching/gateway/ports/kafka/submitter/inout/OrderSubmitRequestEvent.kt index d6c5eb05e..099c06a2a 100644 --- a/matching-gateway/matching-gateway-port/matching-gateway-submitter-kafka/src/main/kotlin/co/nilin/opex/matching/gateway/ports/kafka/submitter/inout/OrderSubmitRequest.kt +++ b/matching-gateway/matching-gateway-port/matching-gateway-submitter-kafka/src/main/kotlin/co/nilin/opex/matching/gateway/ports/kafka/submitter/inout/OrderSubmitRequestEvent.kt @@ -6,15 +6,14 @@ import co.nilin.opex.matching.engine.core.model.OrderType import co.nilin.opex.matching.engine.core.model.Pair import java.util.* -data class OrderSubmitRequest( - val uuid: String, - val pair: Pair, +class OrderSubmitRequestEvent( + uuid: String, + pair: Pair, val price: Long, val quantity: Long, val direction: OrderDirection, val matchConstraint: MatchConstraint, val orderType: OrderType, val userLevel: String, - val ouid: String = UUID.randomUUID().toString(), val orderId: Long? = null, -) \ No newline at end of file +) : OrderRequestEvent(UUID.randomUUID().toString(), uuid, pair) \ No newline at end of file diff --git a/matching-gateway/matching-gateway-port/matching-gateway-submitter-kafka/src/main/kotlin/co/nilin/opex/matching/gateway/ports/kafka/submitter/service/OrderSubmitter.kt b/matching-gateway/matching-gateway-port/matching-gateway-submitter-kafka/src/main/kotlin/co/nilin/opex/matching/gateway/ports/kafka/submitter/service/OrderRequestEventSubmitter.kt similarity index 69% rename from matching-gateway/matching-gateway-port/matching-gateway-submitter-kafka/src/main/kotlin/co/nilin/opex/matching/gateway/ports/kafka/submitter/service/OrderSubmitter.kt rename to matching-gateway/matching-gateway-port/matching-gateway-submitter-kafka/src/main/kotlin/co/nilin/opex/matching/gateway/ports/kafka/submitter/service/OrderRequestEventSubmitter.kt index 57ed1bd61..d66ee7ac6 100644 --- a/matching-gateway/matching-gateway-port/matching-gateway-submitter-kafka/src/main/kotlin/co/nilin/opex/matching/gateway/ports/kafka/submitter/service/OrderSubmitter.kt +++ b/matching-gateway/matching-gateway-port/matching-gateway-submitter-kafka/src/main/kotlin/co/nilin/opex/matching/gateway/ports/kafka/submitter/service/OrderRequestEventSubmitter.kt @@ -1,6 +1,6 @@ package co.nilin.opex.matching.gateway.ports.kafka.submitter.service -import co.nilin.opex.matching.gateway.ports.kafka.submitter.inout.OrderSubmitRequest +import co.nilin.opex.matching.gateway.ports.kafka.submitter.inout.OrderRequestEvent import co.nilin.opex.matching.gateway.ports.kafka.submitter.inout.OrderSubmitResult import org.slf4j.LoggerFactory import org.springframework.kafka.core.KafkaTemplate @@ -10,12 +10,12 @@ import kotlin.coroutines.resumeWithException import kotlin.coroutines.suspendCoroutine @Component -class OrderSubmitter(val kafkaTemplate: KafkaTemplate) { +class OrderRequestEventSubmitter(val kafkaTemplate: KafkaTemplate) { - private val logger = LoggerFactory.getLogger(OrderSubmitter::class.java) + private val logger = LoggerFactory.getLogger(OrderRequestEventSubmitter::class.java) - suspend fun submit(order: OrderSubmitRequest): OrderSubmitResult = suspendCoroutine { cont -> - logger.info("Submitting OrderSubmitRequest: ouid=${order.ouid}") + suspend fun submit(order: OrderRequestEvent): OrderSubmitResult = suspendCoroutine { cont -> + logger.info("Submitting OrderRequestEvent: ouid=${order.ouid}") val sendFuture = kafkaTemplate.send("orders_${order.pair.leftSideName}_${order.pair.rightSideName}", order) sendFuture.addCallback({ diff --git a/preferences-dev.yml b/preferences-dev.yml index d8d616c99..87e30f3ef 100644 --- a/preferences-dev.yml +++ b/preferences-dev.yml @@ -10,38 +10,84 @@ chains: - url: http://bitcoin-scanner:8080 maxBlockRange: 30 delayOnRateLimit: 5 - schedule: - delay: 600 - errorDelay: 60 - timeout: 30 - maxRetries: 5 - confirmations: 0 - enabled: false + maxParallelCall: 2 + schedules: + - workerType: MAIN + delay: 600 + timeout: 30 + maxRetries: 5 + confirmations: 0 + maxBlockCount: 4 + enabled: false + - workerType: ERROR + delay: 600 + timeout: 30 + maxRetries: 5 + confirmations: 0 + maxBlockCount: 4 + enabled: false + - workerType: DELAYED + delay: 300 + timeout: 30 + maxRetries: 5 + confirmations: 0 + maxBlockCount: 2 + enabled: false - name: test-ethereum addressType: ethereum scanners: - url: http://ethereum-scanner:8080 maxBlockRange: 30 delayOnRateLimit: 5 - schedule: - delay: 15 - errorDelay: 7 - timeout: 30 - maxRetries: 5 - confirmations: 0 - enabled: false + maxParallelCall: 3 + schedules: + - workerType: MAIN + delay: 15 + timeout: 30 + maxRetries: 5 + confirmations: 0 + maxBlockCount: 10 + enabled: false + - workerType: ERROR + delay: 7 + timeout: 30 + maxRetries: 5 + confirmations: 0 + maxBlockCount: 10 + enabled: false + - workerType: DELAYED + delay: 15 + timeout: 30 + maxRetries: 5 + confirmations: 0 + maxBlockCount: 5 + enabled: false - name: test-bsc addressType: ethereum scanners: - url: http://bsc-scanner:8080 maxBlockRange: 10 delayOnRateLimit: 300 - schedule: - delay: 6 - errorDelay: 3 - timeout: 30 - maxRetries: 50 - confirmations: 0 + maxParallelCall: 5 + schedules: + - workerType: MAIN + delay: 6 + timeout: 30 + maxRetries: 5 + confirmations: 0 + maxBlockCount: 30 + - workerType: ERROR + delay: 3 + timeout: 30 + maxRetries: 5 + confirmations: 0 + maxBlockCount: 20 + - workerType: DELAYED + delay: 10 + timeout: 30 + maxRetries: 5 + confirmations: 0 + maxBlockCount: 10 currencies: - symbol: IRT name: Toman From 996c19eec08c01806418676b0dbccd420b646089 Mon Sep 17 00:00:00 2001 From: Peyman Date: Sun, 11 Jun 2023 16:06:04 +0330 Subject: [PATCH 2/4] Close #359: Fixed cancel order request event processing --- .../accountant/app/listener/OrderListener.kt | 38 ++++++++++--------- .../src/main/resources/application.yml | 4 +- .../core/service/OrderManagerImpl.kt | 6 ++- .../listener/config/AccountantKafkaConfig.kt | 2 +- .../listener/consumer/OrderKafkaListener.kt | 4 +- .../listener/inout/OrderCancelRequestEvent.kt | 10 +++++ .../kafka/listener/inout/OrderRequestEvent.kt | 5 +++ ...tRequest.kt => OrderSubmitRequestEvent.kt} | 18 ++++----- .../spi/OrderSubmitRequestListener.kt | 4 +- .../kafka/listener/config/OrderKafkaConfig.kt | 2 +- .../gateway/app/service/OrderServiceTest.kt | 1 - .../submitter/config/OrderKafkaConfig.kt | 3 +- 12 files changed, 59 insertions(+), 38 deletions(-) create mode 100644 accountant/accountant-ports/accountant-eventlistener-kafka/src/main/kotlin/co/nilin/opex/accountant/ports/kafka/listener/inout/OrderCancelRequestEvent.kt create mode 100644 accountant/accountant-ports/accountant-eventlistener-kafka/src/main/kotlin/co/nilin/opex/accountant/ports/kafka/listener/inout/OrderRequestEvent.kt rename accountant/accountant-ports/accountant-eventlistener-kafka/src/main/kotlin/co/nilin/opex/accountant/ports/kafka/listener/inout/{OrderSubmitRequest.kt => OrderSubmitRequestEvent.kt} (64%) diff --git a/accountant/accountant-app/src/main/kotlin/co/nilin/opex/accountant/app/listener/OrderListener.kt b/accountant/accountant-app/src/main/kotlin/co/nilin/opex/accountant/app/listener/OrderListener.kt index 52f95ef56..3ebd44422 100644 --- a/accountant/accountant-app/src/main/kotlin/co/nilin/opex/accountant/app/listener/OrderListener.kt +++ b/accountant/accountant-app/src/main/kotlin/co/nilin/opex/accountant/app/listener/OrderListener.kt @@ -1,7 +1,8 @@ package co.nilin.opex.accountant.app.listener import co.nilin.opex.accountant.core.api.OrderManager -import co.nilin.opex.accountant.ports.kafka.listener.inout.OrderSubmitRequest +import co.nilin.opex.accountant.ports.kafka.listener.inout.OrderRequestEvent +import co.nilin.opex.accountant.ports.kafka.listener.inout.OrderSubmitRequestEvent import co.nilin.opex.accountant.ports.kafka.listener.spi.OrderSubmitRequestListener import co.nilin.opex.matching.engine.core.eventh.events.SubmitOrderEvent import kotlinx.coroutines.runBlocking @@ -15,25 +16,26 @@ class OrderListener(private val orderManager: OrderManager) : OrderSubmitRequest return "OrderListener" } - override fun onEvent(event: OrderSubmitRequest, partition: Int, offset: Long, timestamp: Long) { + override fun onEvent(event: OrderRequestEvent, partition: Int, offset: Long, timestamp: Long) { runBlocking { - logger.info("Order submit event received ${event.ouid}") - - orderManager.handleRequestOrder( - SubmitOrderEvent( - event.ouid, - event.uuid, - event.orderId, - event.pair, - event.price, - event.quantity, - event.quantity, - event.direction, - event.matchConstraint, - event.orderType, - event.userLevel + if (event is OrderSubmitRequestEvent) { + logger.info("Order submit event received ${event.ouid}") + orderManager.handleRequestOrder( + SubmitOrderEvent( + event.ouid, + event.uuid, + event.orderId, + event.pair, + event.price, + event.quantity, + event.quantity, + event.direction, + event.matchConstraint, + event.orderType, + event.userLevel + ) ) - ) + } } } } \ No newline at end of file diff --git a/accountant/accountant-app/src/main/resources/application.yml b/accountant/accountant-app/src/main/resources/application.yml index 0bf62e5a8..a41c93880 100644 --- a/accountant/accountant-app/src/main/resources/application.yml +++ b/accountant/accountant-app/src/main/resources/application.yml @@ -1,8 +1,8 @@ server.port: 8080 logging: level: - co.nilin: DEBUG - reactor.netty.http.client: DEBUG + co.nilin: INFO + reactor.netty.http.client: INFO spring: application: name: opex-accountant diff --git a/accountant/accountant-core/src/main/kotlin/co/nilin/opex/accountant/core/service/OrderManagerImpl.kt b/accountant/accountant-core/src/main/kotlin/co/nilin/opex/accountant/core/service/OrderManagerImpl.kt index afc94a6ab..0922e415a 100644 --- a/accountant/accountant-core/src/main/kotlin/co/nilin/opex/accountant/core/service/OrderManagerImpl.kt +++ b/accountant/accountant-core/src/main/kotlin/co/nilin/opex/accountant/core/service/OrderManagerImpl.kt @@ -9,6 +9,7 @@ import co.nilin.opex.accountant.core.model.FinancialActionStatus import co.nilin.opex.accountant.core.model.Order import co.nilin.opex.accountant.core.spi.* import co.nilin.opex.matching.engine.core.eventh.events.* +import co.nilin.opex.matching.engine.core.inout.RequestedOperation import co.nilin.opex.matching.engine.core.model.OrderDirection import org.springframework.transaction.annotation.Transactional import java.math.BigDecimal @@ -122,6 +123,9 @@ open class OrderManagerImpl( @Transactional override suspend fun handleRejectOrder(rejectOrderEvent: RejectOrderEvent): List { + if (rejectOrderEvent.requestedOperation != RequestedOperation.PLACE_ORDER) + return emptyList() + //order by ouid val order = orderPersister.load(rejectOrderEvent.ouid) if (order == null) { @@ -185,7 +189,7 @@ open class OrderManagerImpl( //create fa for transfer remaining transfered uuid symbol exchange wallet to uuid main exchange wallet val financialAction = FinancialAction( parentFinancialAction, - RejectOrderEvent::class.simpleName!!, + CancelOrderEvent::class.simpleName!!, cancelOrderEvent.ouid, symbol, order.remainedTransferAmount, diff --git a/accountant/accountant-ports/accountant-eventlistener-kafka/src/main/kotlin/co/nilin/opex/accountant/ports/kafka/listener/config/AccountantKafkaConfig.kt b/accountant/accountant-ports/accountant-eventlistener-kafka/src/main/kotlin/co/nilin/opex/accountant/ports/kafka/listener/config/AccountantKafkaConfig.kt index 096fac5d9..f6c45d6fa 100644 --- a/accountant/accountant-ports/accountant-eventlistener-kafka/src/main/kotlin/co/nilin/opex/accountant/ports/kafka/listener/config/AccountantKafkaConfig.kt +++ b/accountant/accountant-ports/accountant-eventlistener-kafka/src/main/kotlin/co/nilin/opex/accountant/ports/kafka/listener/config/AccountantKafkaConfig.kt @@ -39,7 +39,7 @@ class AccountantKafkaConfig { ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG to StringDeserializer::class.java, ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG to JsonDeserializer::class.java, JsonDeserializer.TRUSTED_PACKAGES to "co.nilin.opex.*", - JsonDeserializer.TYPE_MAPPINGS to "order_request:co.nilin.opex.accountant.ports.kafka.listener.inout.OrderSubmitRequest" + JsonDeserializer.TYPE_MAPPINGS to "order_request_event:co.nilin.opex.accountant.ports.kafka.listener.inout.OrderRequestEvent,order_request_submit:co.nilin.opex.accountant.ports.kafka.listener.inout.OrderSubmitRequestEvent,order_request_cancel:co.nilin.opex.accountant.ports.kafka.listener.inout.OrderCancelRequestEvent" ) } diff --git a/accountant/accountant-ports/accountant-eventlistener-kafka/src/main/kotlin/co/nilin/opex/accountant/ports/kafka/listener/consumer/OrderKafkaListener.kt b/accountant/accountant-ports/accountant-eventlistener-kafka/src/main/kotlin/co/nilin/opex/accountant/ports/kafka/listener/consumer/OrderKafkaListener.kt index 08b3d7df5..b01ae2c9a 100644 --- a/accountant/accountant-ports/accountant-eventlistener-kafka/src/main/kotlin/co/nilin/opex/accountant/ports/kafka/listener/consumer/OrderKafkaListener.kt +++ b/accountant/accountant-ports/accountant-eventlistener-kafka/src/main/kotlin/co/nilin/opex/accountant/ports/kafka/listener/consumer/OrderKafkaListener.kt @@ -1,8 +1,8 @@ package co.nilin.opex.accountant.ports.kafka.listener.consumer -import co.nilin.opex.accountant.ports.kafka.listener.inout.OrderSubmitRequest +import co.nilin.opex.accountant.ports.kafka.listener.inout.OrderRequestEvent import co.nilin.opex.accountant.ports.kafka.listener.spi.OrderSubmitRequestListener import org.springframework.stereotype.Component @Component -class OrderKafkaListener : EventConsumer() \ No newline at end of file +class OrderKafkaListener : EventConsumer() \ No newline at end of file diff --git a/accountant/accountant-ports/accountant-eventlistener-kafka/src/main/kotlin/co/nilin/opex/accountant/ports/kafka/listener/inout/OrderCancelRequestEvent.kt b/accountant/accountant-ports/accountant-eventlistener-kafka/src/main/kotlin/co/nilin/opex/accountant/ports/kafka/listener/inout/OrderCancelRequestEvent.kt new file mode 100644 index 000000000..c10c325ac --- /dev/null +++ b/accountant/accountant-ports/accountant-eventlistener-kafka/src/main/kotlin/co/nilin/opex/accountant/ports/kafka/listener/inout/OrderCancelRequestEvent.kt @@ -0,0 +1,10 @@ +package co.nilin.opex.accountant.ports.kafka.listener.inout + +import co.nilin.opex.matching.engine.core.model.Pair + +class OrderCancelRequestEvent( + ouid: String, + uuid: String, + pair: Pair, + val orderId: Long +) : OrderRequestEvent(ouid, uuid, pair) \ No newline at end of file diff --git a/accountant/accountant-ports/accountant-eventlistener-kafka/src/main/kotlin/co/nilin/opex/accountant/ports/kafka/listener/inout/OrderRequestEvent.kt b/accountant/accountant-ports/accountant-eventlistener-kafka/src/main/kotlin/co/nilin/opex/accountant/ports/kafka/listener/inout/OrderRequestEvent.kt new file mode 100644 index 000000000..04876ee6f --- /dev/null +++ b/accountant/accountant-ports/accountant-eventlistener-kafka/src/main/kotlin/co/nilin/opex/accountant/ports/kafka/listener/inout/OrderRequestEvent.kt @@ -0,0 +1,5 @@ +package co.nilin.opex.accountant.ports.kafka.listener.inout + +import co.nilin.opex.matching.engine.core.model.Pair + +abstract class OrderRequestEvent(val ouid:String, val uuid: String, val pair: Pair) \ No newline at end of file diff --git a/accountant/accountant-ports/accountant-eventlistener-kafka/src/main/kotlin/co/nilin/opex/accountant/ports/kafka/listener/inout/OrderSubmitRequest.kt b/accountant/accountant-ports/accountant-eventlistener-kafka/src/main/kotlin/co/nilin/opex/accountant/ports/kafka/listener/inout/OrderSubmitRequestEvent.kt similarity index 64% rename from accountant/accountant-ports/accountant-eventlistener-kafka/src/main/kotlin/co/nilin/opex/accountant/ports/kafka/listener/inout/OrderSubmitRequest.kt rename to accountant/accountant-ports/accountant-eventlistener-kafka/src/main/kotlin/co/nilin/opex/accountant/ports/kafka/listener/inout/OrderSubmitRequestEvent.kt index 6790172ae..71e7275da 100644 --- a/accountant/accountant-ports/accountant-eventlistener-kafka/src/main/kotlin/co/nilin/opex/accountant/ports/kafka/listener/inout/OrderSubmitRequest.kt +++ b/accountant/accountant-ports/accountant-eventlistener-kafka/src/main/kotlin/co/nilin/opex/accountant/ports/kafka/listener/inout/OrderSubmitRequestEvent.kt @@ -5,15 +5,15 @@ import co.nilin.opex.matching.engine.core.model.OrderDirection import co.nilin.opex.matching.engine.core.model.OrderType import co.nilin.opex.matching.engine.core.model.Pair -data class OrderSubmitRequest( - val ouid: String, - val uuid: String, - val orderId: Long?, - val pair: Pair, - val price: Long = 0, - val quantity: Long = 0, +class OrderSubmitRequestEvent( + ouid: String, + uuid: String, + pair: Pair, + val price: Long, + val quantity: Long, val direction: OrderDirection, val matchConstraint: MatchConstraint, val orderType: OrderType, - val userLevel: String -) \ No newline at end of file + val userLevel: String, + val orderId: Long? = null, +) : OrderRequestEvent(ouid, uuid, pair) \ No newline at end of file diff --git a/accountant/accountant-ports/accountant-eventlistener-kafka/src/main/kotlin/co/nilin/opex/accountant/ports/kafka/listener/spi/OrderSubmitRequestListener.kt b/accountant/accountant-ports/accountant-eventlistener-kafka/src/main/kotlin/co/nilin/opex/accountant/ports/kafka/listener/spi/OrderSubmitRequestListener.kt index ac9a24904..ae224ab1d 100644 --- a/accountant/accountant-ports/accountant-eventlistener-kafka/src/main/kotlin/co/nilin/opex/accountant/ports/kafka/listener/spi/OrderSubmitRequestListener.kt +++ b/accountant/accountant-ports/accountant-eventlistener-kafka/src/main/kotlin/co/nilin/opex/accountant/ports/kafka/listener/spi/OrderSubmitRequestListener.kt @@ -1,5 +1,5 @@ package co.nilin.opex.accountant.ports.kafka.listener.spi -import co.nilin.opex.accountant.ports.kafka.listener.inout.OrderSubmitRequest +import co.nilin.opex.accountant.ports.kafka.listener.inout.OrderRequestEvent -interface OrderSubmitRequestListener : Listener \ No newline at end of file +interface OrderSubmitRequestListener : Listener \ No newline at end of file diff --git a/matching-engine/matching-engine-ports/matching-engine-eventlistener-kafka/src/main/kotlin/co/nilin/opex/matching/engine/ports/kafka/listener/config/OrderKafkaConfig.kt b/matching-engine/matching-engine-ports/matching-engine-eventlistener-kafka/src/main/kotlin/co/nilin/opex/matching/engine/ports/kafka/listener/config/OrderKafkaConfig.kt index 0cb4147f5..6262ba1df 100644 --- a/matching-engine/matching-engine-ports/matching-engine-eventlistener-kafka/src/main/kotlin/co/nilin/opex/matching/engine/ports/kafka/listener/config/OrderKafkaConfig.kt +++ b/matching-engine/matching-engine-ports/matching-engine-eventlistener-kafka/src/main/kotlin/co/nilin/opex/matching/engine/ports/kafka/listener/config/OrderKafkaConfig.kt @@ -41,7 +41,7 @@ class OrderKafkaConfig { ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG to StringDeserializer::class.java, ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG to JsonDeserializer::class.java, JsonDeserializer.TRUSTED_PACKAGES to "co.nilin.opex.*", - JsonDeserializer.TYPE_MAPPINGS to "order_request_event=co.nilin.opex.matching.engine.core.inout.OrderRequestEvent,order_request_submit:co.nilin.opex.matching.engine.core.inout.OrderSubmitRequestEvent,order_request_cancel:co.nilin.opex.matching.engine.core.inout.OrderCancelRequestEvent" + JsonDeserializer.TYPE_MAPPINGS to "order_request_event:co.nilin.opex.matching.engine.core.inout.OrderRequestEvent,order_request_submit:co.nilin.opex.matching.engine.core.inout.OrderSubmitRequestEvent,order_request_cancel:co.nilin.opex.matching.engine.core.inout.OrderCancelRequestEvent" ) } diff --git a/matching-gateway/matching-gateway-app/src/test/kotlin/co/nilin/opex/matching/gateway/app/service/OrderServiceTest.kt b/matching-gateway/matching-gateway-app/src/test/kotlin/co/nilin/opex/matching/gateway/app/service/OrderServiceTest.kt index b1f25539d..574e32c8b 100644 --- a/matching-gateway/matching-gateway-app/src/test/kotlin/co/nilin/opex/matching/gateway/app/service/OrderServiceTest.kt +++ b/matching-gateway/matching-gateway-app/src/test/kotlin/co/nilin/opex/matching/gateway/app/service/OrderServiceTest.kt @@ -24,7 +24,6 @@ private class OrderServiceTest { private val orderService: OrderService = OrderService( accountantApiProxy, orderRequestEventSubmitter, - eventSubmitter, pairConfigLoader, kafkaHealthIndicator ) diff --git a/matching-gateway/matching-gateway-port/matching-gateway-submitter-kafka/src/main/kotlin/co/nilin/opex/matching/gateway/ports/kafka/submitter/config/OrderKafkaConfig.kt b/matching-gateway/matching-gateway-port/matching-gateway-submitter-kafka/src/main/kotlin/co/nilin/opex/matching/gateway/ports/kafka/submitter/config/OrderKafkaConfig.kt index faad92c36..0e0bf5189 100644 --- a/matching-gateway/matching-gateway-port/matching-gateway-submitter-kafka/src/main/kotlin/co/nilin/opex/matching/gateway/ports/kafka/submitter/config/OrderKafkaConfig.kt +++ b/matching-gateway/matching-gateway-port/matching-gateway-submitter-kafka/src/main/kotlin/co/nilin/opex/matching/gateway/ports/kafka/submitter/config/OrderKafkaConfig.kt @@ -12,6 +12,7 @@ import org.springframework.context.annotation.Configuration import org.springframework.kafka.core.DefaultKafkaProducerFactory import org.springframework.kafka.core.KafkaTemplate import org.springframework.kafka.core.ProducerFactory +import org.springframework.kafka.support.serializer.JsonDeserializer import org.springframework.kafka.support.serializer.JsonSerializer @Configuration @@ -27,7 +28,7 @@ class OrderKafkaConfig { ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG to StringSerializer::class.java, ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG to JsonSerializer::class.java, ProducerConfig.ACKS_CONFIG to "all", - JsonSerializer.TYPE_MAPPINGS to "order_request:co.nilin.opex.matching.gateway.ports.kafka.submitter.inout.OrderSubmitRequest" + JsonDeserializer.TYPE_MAPPINGS to "order_request_event:co.nilin.opex.matching.gateway.ports.kafka.submitter.inout.OrderRequestEvent,order_request_submit:co.nilin.opex.matching.gateway.ports.kafka.submitter.inout.OrderSubmitRequestEvent,order_request_cancel:co.nilin.opex.matching.gateway.ports.kafka.submitter.inout.OrderCancelRequestEvent" ) } From fcf84de9de455b2e4d13c558a333866e975deb96 Mon Sep 17 00:00:00 2001 From: Peyman Date: Sun, 11 Jun 2023 16:24:42 +0330 Subject: [PATCH 3/4] Fix test failures --- .../core/service/OrderManagerImplTest.kt | 19 +++++++++++++++++-- .../gateway/app/service/OrderServiceTest.kt | 13 +------------ 2 files changed, 18 insertions(+), 14 deletions(-) diff --git a/accountant/accountant-core/src/test/kotlin/co/nilin/opex/accountant/core/service/OrderManagerImplTest.kt b/accountant/accountant-core/src/test/kotlin/co/nilin/opex/accountant/core/service/OrderManagerImplTest.kt index 9406ff297..6b50ff506 100644 --- a/accountant/accountant-core/src/test/kotlin/co/nilin/opex/accountant/core/service/OrderManagerImplTest.kt +++ b/accountant/accountant-core/src/test/kotlin/co/nilin/opex/accountant/core/service/OrderManagerImplTest.kt @@ -222,7 +222,7 @@ internal class OrderManagerImplTest { "user_1", 56, Pair("BTC", "USDT"), - RequestedOperation.CANCEL_ORDER, + RequestedOperation.PLACE_ORDER, RejectReason.ORDER_NOT_FOUND ) @@ -234,6 +234,21 @@ internal class OrderManagerImplTest { coVerify(exactly = 1) { tempEventPersister.saveTempEvent(any(), any()) } } + @Test + fun givenRejectOrderReceived_whenOperationNotPlaceOrder_returnEmptyFA(): Unit = runBlocking { + val orderEvent = RejectOrderEvent( + "ouid", + "user_1", + 56, + Pair("BTC", "USDT"), + RequestedOperation.CANCEL_ORDER, + RejectReason.ORDER_NOT_FOUND + ) + + val fa = orderManager.handleRejectOrder(orderEvent) + assertThat(fa.size).isEqualTo(0) + } + @Test fun givenRejectOrderReceived_whenLocalFound_publishRichOrderUpdate(): Unit = runBlocking { val orderEvent = RejectOrderEvent( @@ -246,7 +261,7 @@ internal class OrderManagerImplTest { OrderDirection.BID, MatchConstraint.GTC, OrderType.LIMIT_ORDER, - RequestedOperation.CANCEL_ORDER, + RequestedOperation.PLACE_ORDER, RejectReason.ORDER_NOT_FOUND, ) coEvery { orderPersister.load(any()) } returns Valid.order diff --git a/matching-gateway/matching-gateway-app/src/test/kotlin/co/nilin/opex/matching/gateway/app/service/OrderServiceTest.kt b/matching-gateway/matching-gateway-app/src/test/kotlin/co/nilin/opex/matching/gateway/app/service/OrderServiceTest.kt index 574e32c8b..306586dff 100644 --- a/matching-gateway/matching-gateway-app/src/test/kotlin/co/nilin/opex/matching/gateway/app/service/OrderServiceTest.kt +++ b/matching-gateway/matching-gateway-app/src/test/kotlin/co/nilin/opex/matching/gateway/app/service/OrderServiceTest.kt @@ -18,7 +18,7 @@ import java.math.BigDecimal private class OrderServiceTest { private val accountantApiProxy: AccountantApiProxy = mockk() private val orderRequestEventSubmitter: OrderRequestEventSubmitter = mockk() - private val eventSubmitter: EventSubmitter = mockk() + private val eventSubmitter: OrderRequestEventSubmitter = mockk() private val pairConfigLoader: PairConfigLoader = mockk() private val kafkaHealthIndicator: KafkaHealthIndicator = mockk() private val orderService: OrderService = OrderService( @@ -174,15 +174,4 @@ private class OrderServiceTest { } }.isNotInstanceOf(MockKException::class.java) } - - @Test - fun givenEventSubmitter_whenCancelOrder_thenOrderSubmitResult(): Unit = runBlocking { - coEvery { - eventSubmitter.submit(any()) - } returns OrderSubmitResult(null) - - val orderSubmitResult = orderService.cancelOrder(VALID.CANCEL_ORDER_REQUEST) - - assertThat(orderSubmitResult).isNotNull - } } From 56b0f36bb32e65fc478f7398e69f7e601aa87997 Mon Sep 17 00:00:00 2001 From: Peyman Date: Tue, 13 Jun 2023 17:14:51 +0330 Subject: [PATCH 4/4] Revert CancelOrderEvent class type on cancel order --- .../co/nilin/opex/accountant/core/service/OrderManagerImpl.kt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/accountant/accountant-core/src/main/kotlin/co/nilin/opex/accountant/core/service/OrderManagerImpl.kt b/accountant/accountant-core/src/main/kotlin/co/nilin/opex/accountant/core/service/OrderManagerImpl.kt index 0922e415a..34456db1d 100644 --- a/accountant/accountant-core/src/main/kotlin/co/nilin/opex/accountant/core/service/OrderManagerImpl.kt +++ b/accountant/accountant-core/src/main/kotlin/co/nilin/opex/accountant/core/service/OrderManagerImpl.kt @@ -189,7 +189,7 @@ open class OrderManagerImpl( //create fa for transfer remaining transfered uuid symbol exchange wallet to uuid main exchange wallet val financialAction = FinancialAction( parentFinancialAction, - CancelOrderEvent::class.simpleName!!, + RejectOrderEvent::class.simpleName!!, cancelOrderEvent.ouid, symbol, order.remainedTransferAmount,