diff --git a/accountant/accountant-core/src/main/kotlin/co/nilin/opex/accountant/core/service/OrderManagerImpl.kt b/accountant/accountant-core/src/main/kotlin/co/nilin/opex/accountant/core/service/OrderManagerImpl.kt index ed2571adc..5ce39ecf6 100644 --- a/accountant/accountant-core/src/main/kotlin/co/nilin/opex/accountant/core/service/OrderManagerImpl.kt +++ b/accountant/accountant-core/src/main/kotlin/co/nilin/opex/accountant/core/service/OrderManagerImpl.kt @@ -204,7 +204,7 @@ open class OrderManagerImpl( order.price.toBigDecimal(), order.quantity.toBigDecimal(), cancelOrderEvent.remainedQuantity.toBigDecimal(), - OrderStatus.REJECTED + OrderStatus.CANCELED ) ) val fa = financialActionPersister.persist(listOf(financialAction)) diff --git a/market/market-core/src/main/kotlin/co/nilin/opex/market/core/inout/OrderEnums.kt b/market/market-core/src/main/kotlin/co/nilin/opex/market/core/inout/OrderEnums.kt index 7a1ec7594..a674d541d 100644 --- a/market/market-core/src/main/kotlin/co/nilin/opex/market/core/inout/OrderEnums.kt +++ b/market/market-core/src/main/kotlin/co/nilin/opex/market/core/inout/OrderEnums.kt @@ -31,6 +31,10 @@ enum class OrderStatus(val code: Int, val orderOfAppearance: Int) { return orderOfAppearance > status.orderOfAppearance } + fun isOpenOrder(): Boolean { + return this == NEW || this == PARTIALLY_FILLED + } + companion object { fun fromCode(code: Int?): OrderStatus? { if (code == null) diff --git a/market/market-ports/market-persister-postgres/src/main/kotlin/co/nilin/opex/market/ports/postgres/dao/OpenOrderRepository.kt b/market/market-ports/market-persister-postgres/src/main/kotlin/co/nilin/opex/market/ports/postgres/dao/OpenOrderRepository.kt new file mode 100644 index 000000000..033b09364 --- /dev/null +++ b/market/market-ports/market-persister-postgres/src/main/kotlin/co/nilin/opex/market/ports/postgres/dao/OpenOrderRepository.kt @@ -0,0 +1,29 @@ +package co.nilin.opex.market.ports.postgres.dao + +import co.nilin.opex.market.ports.postgres.model.OpenOrderModel +import org.springframework.data.r2dbc.repository.Query +import org.springframework.data.repository.reactive.ReactiveCrudRepository +import org.springframework.stereotype.Repository +import reactor.core.publisher.Mono +import java.math.BigDecimal + +@Repository +interface OpenOrderRepository : ReactiveCrudRepository { + + @Query( + """ + insert into open_orders (ouid, executed_quantity, status) + values (:ouid, :executedQuantity, :status) + on conflict (ouid) + do update set executed_quantity = excluded.executed_quantity, status = :status + """ + ) + fun insertOrUpdate(ouid: String, executedQuantity: BigDecimal?, status: Int): Mono + + @Query("update open_orders set executed_quantity = :executedQuantity, status = :status where ouid = :ouid") + fun update(ouid: String, executedQuantity: BigDecimal?): Mono + + @Query("delete from open_orders where ouid = :ouid") + fun delete(ouid: String): Mono + +} \ No newline at end of file diff --git a/market/market-ports/market-persister-postgres/src/main/kotlin/co/nilin/opex/market/ports/postgres/dao/OrderRepository.kt b/market/market-ports/market-persister-postgres/src/main/kotlin/co/nilin/opex/market/ports/postgres/dao/OrderRepository.kt index 810cc540a..8b060034f 100644 --- a/market/market-ports/market-persister-postgres/src/main/kotlin/co/nilin/opex/market/ports/postgres/dao/OrderRepository.kt +++ b/market/market-ports/market-persister-postgres/src/main/kotlin/co/nilin/opex/market/ports/postgres/dao/OrderRepository.kt @@ -40,10 +40,8 @@ interface OrderRepository : ReactiveCrudRepository { @Query( """ select * from orders - join order_status os on orders.ouid = os.ouid + join open_orders oo on orders.ouid = oo.ouid where uuid = :uuid and (:symbol is null or symbol = :symbol) and status in (:statuses) - and appearance = (select max(appearance) from order_status where ouid = orders.ouid) - and executed_quantity = (select max(executed_quantity) from order_status where ouid = orders.ouid) order by create_date desc limit :limit """ @@ -81,13 +79,11 @@ interface OrderRepository : ReactiveCrudRepository { @Query( """ - select price, (sum(quantity) - sum(os.executed_quantity)) as quantity from orders - join order_status os on orders.ouid = os.ouid - where symbol = :symbol and side = :direction and os.status in (:statuses) - and appearance = (select max(appearance) from order_status where ouid = orders.ouid) - and executed_quantity = (select max(executed_quantity) from order_status where ouid = orders.ouid) + select price, (sum(quantity) - sum(oo.executed_quantity)) as quantity from orders + join open_orders oo on orders.ouid = oo.ouid + where symbol = :symbol and side = :direction and status in (:statuses) group by price - order by price asc + order by price asc limit :limit """ ) @@ -104,11 +100,9 @@ interface OrderRepository : ReactiveCrudRepository { @Query( """ - select price, (sum(quantity) - sum(executed_quantity)) as quantity from orders - join order_status os on orders.ouid = os.ouid + select price, (sum(quantity) - sum(oo.executed_quantity)) as quantity from orders + join open_orders oo on orders.ouid = oo.ouid where symbol = :symbol and side = :direction and status in (:statuses) - and appearance = (select max(appearance) from order_status where ouid = orders.ouid) - and executed_quantity = (select max(executed_quantity) from order_status where ouid = orders.ouid) group by price order by price desc limit :limit diff --git a/market/market-ports/market-persister-postgres/src/main/kotlin/co/nilin/opex/market/ports/postgres/dao/OrderStatusRepository.kt b/market/market-ports/market-persister-postgres/src/main/kotlin/co/nilin/opex/market/ports/postgres/dao/OrderStatusRepository.kt index 81e75a0bc..11261d982 100644 --- a/market/market-ports/market-persister-postgres/src/main/kotlin/co/nilin/opex/market/ports/postgres/dao/OrderStatusRepository.kt +++ b/market/market-ports/market-persister-postgres/src/main/kotlin/co/nilin/opex/market/ports/postgres/dao/OrderStatusRepository.kt @@ -5,10 +5,28 @@ import org.springframework.data.r2dbc.repository.Query import org.springframework.data.repository.reactive.ReactiveCrudRepository import org.springframework.stereotype.Repository import reactor.core.publisher.Mono +import java.math.BigDecimal +import java.time.LocalDateTime @Repository interface OrderStatusRepository : ReactiveCrudRepository { + @Query( + """ + insert into order_status (ouid, executed_quantity, accumulative_quote_qty, status, appearance, date) + values (:ouid, :executedQuantity, :accumulativeQuoteQuantity, :status, :appearance, :date) + on conflict do nothing + """ + ) + fun insert( + ouid: String, + executedQuantity: BigDecimal, + accumulativeQuoteQuantity: BigDecimal, + status: Int, + appearance: Int, + date: LocalDateTime = LocalDateTime.now() + ): Mono + @Query( """ with max_appearance as (select max(appearance) as max_app from order_status where ouid = :ouid) diff --git a/market/market-ports/market-persister-postgres/src/main/kotlin/co/nilin/opex/market/ports/postgres/impl/OrderPersisterImpl.kt b/market/market-ports/market-persister-postgres/src/main/kotlin/co/nilin/opex/market/ports/postgres/impl/OrderPersisterImpl.kt index 23dbc6b4e..0cc782a76 100644 --- a/market/market-ports/market-persister-postgres/src/main/kotlin/co/nilin/opex/market/ports/postgres/impl/OrderPersisterImpl.kt +++ b/market/market-ports/market-persister-postgres/src/main/kotlin/co/nilin/opex/market/ports/postgres/impl/OrderPersisterImpl.kt @@ -5,25 +5,30 @@ import co.nilin.opex.market.core.event.RichOrderUpdate import co.nilin.opex.market.core.inout.Order import co.nilin.opex.market.core.inout.OrderStatus import co.nilin.opex.market.core.spi.OrderPersister +import co.nilin.opex.market.ports.postgres.dao.OpenOrderRepository import co.nilin.opex.market.ports.postgres.dao.OrderRepository import co.nilin.opex.market.ports.postgres.dao.OrderStatusRepository import co.nilin.opex.market.ports.postgres.model.OrderModel import co.nilin.opex.market.ports.postgres.model.OrderStatusModel import co.nilin.opex.market.ports.postgres.util.asOrderDTO import kotlinx.coroutines.reactive.awaitFirstOrNull +import kotlinx.coroutines.reactor.awaitSingle import kotlinx.coroutines.reactor.awaitSingleOrNull import org.slf4j.LoggerFactory import org.springframework.stereotype.Component +import org.springframework.transaction.annotation.Transactional import java.time.LocalDateTime @Component class OrderPersisterImpl( private val orderRepository: OrderRepository, - private val orderStatusRepository: OrderStatusRepository + private val orderStatusRepository: OrderStatusRepository, + private val openOrderRepository: OpenOrderRepository, ) : OrderPersister { private val logger = LoggerFactory.getLogger(OrderPersisterImpl::class.java) + @Transactional override suspend fun save(order: RichOrder) { orderRepository.save( OrderModel( @@ -50,33 +55,46 @@ class OrderPersisterImpl( ).awaitFirstOrNull() logger.info("order ${order.ouid} saved") - orderStatusRepository.save( - OrderStatusModel( - order.ouid, - order.executedQuantity, - order.accumulativeQuoteQty, - OrderStatus.NEW.code, - OrderStatus.NEW.orderOfAppearance - ) + orderStatusRepository.insert( + order.ouid, + order.executedQuantity, + order.accumulativeQuoteQty, + OrderStatus.NEW.code, + OrderStatus.NEW.orderOfAppearance ).awaitFirstOrNull() logger.info("OrderStatus ${order.ouid} saved with status of 'NEW'") + + val lastStatus = orderStatusRepository.findMostRecentByOUID(order.ouid).awaitSingle() + if (OrderStatus.fromCode(lastStatus.status)!!.isOpenOrder()) { + openOrderRepository.insertOrUpdate(order.ouid, lastStatus.executedQuantity, lastStatus.status) + .awaitFirstOrNull() + logger.info("Order ${order.ouid} added to open orders") + } else { + openOrderRepository.delete(order.ouid).awaitSingleOrNull() + logger.info("Order ${order.ouid} deleted from open orders") + } } + @Transactional override suspend fun update(orderUpdate: RichOrderUpdate) { - try { - orderStatusRepository.save( - OrderStatusModel( - orderUpdate.ouid, - orderUpdate.executedQuantity(), - orderUpdate.accumulativeQuoteQuantity(), - orderUpdate.status.code, - orderUpdate.status.orderOfAppearance - ) - ).awaitFirstOrNull() - } catch (e: Exception) { - logger.error("Error updating order status: ${e.message}") - } + orderStatusRepository.insert( + orderUpdate.ouid, + orderUpdate.executedQuantity(), + orderUpdate.accumulativeQuoteQuantity(), + orderUpdate.status.code, + orderUpdate.status.orderOfAppearance + ).awaitFirstOrNull() logger.info("OrderStatus ${orderUpdate.ouid} updated with status of ${orderUpdate.status}") + + val lastStatus = orderStatusRepository.findMostRecentByOUID(orderUpdate.ouid).awaitSingle() + if (OrderStatus.fromCode(lastStatus.status)!!.isOpenOrder()) { + openOrderRepository.insertOrUpdate(orderUpdate.ouid, lastStatus.executedQuantity, lastStatus.status) + .awaitFirstOrNull() + logger.info("Order ${orderUpdate.ouid} added to open orders") + } else { + openOrderRepository.delete(orderUpdate.ouid).awaitSingleOrNull() + logger.info("Order ${orderUpdate.ouid} deleted from open orders") + } } override suspend fun load(ouid: String): Order? { diff --git a/market/market-ports/market-persister-postgres/src/main/kotlin/co/nilin/opex/market/ports/postgres/model/OpenOrderModel.kt b/market/market-ports/market-persister-postgres/src/main/kotlin/co/nilin/opex/market/ports/postgres/model/OpenOrderModel.kt new file mode 100644 index 000000000..291468c8e --- /dev/null +++ b/market/market-ports/market-persister-postgres/src/main/kotlin/co/nilin/opex/market/ports/postgres/model/OpenOrderModel.kt @@ -0,0 +1,14 @@ +package co.nilin.opex.market.ports.postgres.model + +import org.springframework.data.annotation.Id +import org.springframework.data.relational.core.mapping.Table +import java.math.BigDecimal + +@Table("open_orders") +data class OpenOrderModel( + val ouid: String, + val executedQuantity: BigDecimal?, + val status: Int, + @Id + val id: Long? = null, +) \ No newline at end of file diff --git a/market/market-ports/market-persister-postgres/src/main/resources/schema.sql b/market/market-ports/market-persister-postgres/src/main/resources/schema.sql index dd12ca995..f8e03a5f0 100644 --- a/market/market-ports/market-persister-postgres/src/main/resources/schema.sql +++ b/market/market-ports/market-persister-postgres/src/main/resources/schema.sql @@ -34,6 +34,14 @@ CREATE TABLE IF NOT EXISTS order_status UNIQUE (ouid, status, appearance, executed_quantity) ); +CREATE TABLE IF NOT EXISTS open_orders +( + id SERIAL PRIMARY KEY, + ouid VARCHAR(72) NOT NULL UNIQUE , + executed_quantity DECIMAL, + status INTEGER NOT NULL +); + CREATE TABLE IF NOT EXISTS trades ( id SERIAL PRIMARY KEY, diff --git a/market/market-ports/market-persister-postgres/src/test/kotlin/co/nilin/opex/market/ports/postgres/impl/OrderPersisterTest.kt b/market/market-ports/market-persister-postgres/src/test/kotlin/co/nilin/opex/market/ports/postgres/impl/OrderPersisterTest.kt index 55909bf19..e55c92d94 100644 --- a/market/market-ports/market-persister-postgres/src/test/kotlin/co/nilin/opex/market/ports/postgres/impl/OrderPersisterTest.kt +++ b/market/market-ports/market-persister-postgres/src/test/kotlin/co/nilin/opex/market/ports/postgres/impl/OrderPersisterTest.kt @@ -1,5 +1,6 @@ package co.nilin.opex.market.ports.postgres.impl +import co.nilin.opex.market.ports.postgres.dao.OpenOrderRepository import co.nilin.opex.market.ports.postgres.dao.OrderRepository import co.nilin.opex.market.ports.postgres.dao.OrderStatusRepository import co.nilin.opex.market.ports.postgres.impl.sample.VALID @@ -13,7 +14,8 @@ import reactor.core.publisher.Mono class OrderPersisterTest { private val orderRepository: OrderRepository = mockk() private val orderStatusRepository: OrderStatusRepository = mockk() - private val orderPersister = OrderPersisterImpl(orderRepository, orderStatusRepository) + private val openOrderRepository: OpenOrderRepository = mockk() + private val orderPersister = OrderPersisterImpl(orderRepository, orderStatusRepository, openOrderRepository) @Test fun givenOrderRepo_whenSaveRichOrder_thenSuccess(): Unit = runBlocking { @@ -21,8 +23,17 @@ class OrderPersisterTest { orderRepository.save(any()) } returns Mono.just(VALID.MAKER_ORDER_MODEL) every { - orderStatusRepository.save(any()) + orderStatusRepository.insert(any(), any(), any(), any(), any(), any()) + } returns Mono.empty() + every { + orderStatusRepository.findMostRecentByOUID(any()) } returns Mono.just(VALID.MAKER_ORDER_STATUS_MODEL) + every { + openOrderRepository.insertOrUpdate(any(), any(), any()) + } returns Mono.empty() + every { + openOrderRepository.delete(any()) + } returns Mono.empty() assertThatNoException().isThrownBy { runBlocking { orderPersister.save(VALID.RICH_ORDER) } } } @@ -30,8 +41,17 @@ class OrderPersisterTest { @Test fun givenOrderRepo_whenUpdateRichOrder_thenSuccess(): Unit = runBlocking { every { - orderStatusRepository.save(any()) + orderStatusRepository.insert(any(), any(), any(), any(), any(), any()) + } returns Mono.empty() + every { + orderStatusRepository.findMostRecentByOUID(any()) } returns Mono.just(VALID.MAKER_ORDER_STATUS_MODEL) + every { + openOrderRepository.insertOrUpdate(any(), any(), any()) + } returns Mono.empty() + every { + openOrderRepository.delete(any()) + } returns Mono.empty() assertThatNoException().isThrownBy { runBlocking { orderPersister.update(VALID.RICH_ORDER_UPDATE) } } }