Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ open class OrderManagerImpl(
order.price.toBigDecimal(),
order.quantity.toBigDecimal(),
cancelOrderEvent.remainedQuantity.toBigDecimal(),
OrderStatus.REJECTED
OrderStatus.CANCELED
)
)
val fa = financialActionPersister.persist(listOf(financialAction))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@ enum class OrderStatus(val code: Int, val orderOfAppearance: Int) {
return orderOfAppearance > status.orderOfAppearance
}

fun isOpenOrder(): Boolean {
return this == NEW || this == PARTIALLY_FILLED
}

companion object {
fun fromCode(code: Int?): OrderStatus? {
if (code == null)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package co.nilin.opex.market.ports.postgres.dao

import co.nilin.opex.market.ports.postgres.model.OpenOrderModel
import org.springframework.data.r2dbc.repository.Query
import org.springframework.data.repository.reactive.ReactiveCrudRepository
import org.springframework.stereotype.Repository
import reactor.core.publisher.Mono
import java.math.BigDecimal

@Repository
interface OpenOrderRepository : ReactiveCrudRepository<OpenOrderModel, Long> {

@Query(
"""
insert into open_orders (ouid, executed_quantity, status)
values (:ouid, :executedQuantity, :status)
on conflict (ouid)
do update set executed_quantity = excluded.executed_quantity, status = :status
"""
)
fun insertOrUpdate(ouid: String, executedQuantity: BigDecimal?, status: Int): Mono<Void>

@Query("update open_orders set executed_quantity = :executedQuantity, status = :status where ouid = :ouid")
fun update(ouid: String, executedQuantity: BigDecimal?): Mono<Void>

@Query("delete from open_orders where ouid = :ouid")
fun delete(ouid: String): Mono<Void>

}
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,8 @@ interface OrderRepository : ReactiveCrudRepository<OrderModel, Long> {
@Query(
"""
select * from orders
join order_status os on orders.ouid = os.ouid
join open_orders oo on orders.ouid = oo.ouid
where uuid = :uuid and (:symbol is null or symbol = :symbol) and status in (:statuses)
and appearance = (select max(appearance) from order_status where ouid = orders.ouid)
and executed_quantity = (select max(executed_quantity) from order_status where ouid = orders.ouid)
order by create_date desc
limit :limit
"""
Expand Down Expand Up @@ -81,13 +79,11 @@ interface OrderRepository : ReactiveCrudRepository<OrderModel, Long> {

@Query(
"""
select price, (sum(quantity) - sum(os.executed_quantity)) as quantity from orders
join order_status os on orders.ouid = os.ouid
where symbol = :symbol and side = :direction and os.status in (:statuses)
and appearance = (select max(appearance) from order_status where ouid = orders.ouid)
and executed_quantity = (select max(executed_quantity) from order_status where ouid = orders.ouid)
select price, (sum(quantity) - sum(oo.executed_quantity)) as quantity from orders
join open_orders oo on orders.ouid = oo.ouid
where symbol = :symbol and side = :direction and status in (:statuses)
group by price
order by price asc
order by price asc
limit :limit
"""
)
Expand All @@ -104,11 +100,9 @@ interface OrderRepository : ReactiveCrudRepository<OrderModel, Long> {

@Query(
"""
select price, (sum(quantity) - sum(executed_quantity)) as quantity from orders
join order_status os on orders.ouid = os.ouid
select price, (sum(quantity) - sum(oo.executed_quantity)) as quantity from orders
join open_orders oo on orders.ouid = oo.ouid
where symbol = :symbol and side = :direction and status in (:statuses)
and appearance = (select max(appearance) from order_status where ouid = orders.ouid)
and executed_quantity = (select max(executed_quantity) from order_status where ouid = orders.ouid)
group by price
order by price desc
limit :limit
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,28 @@ import org.springframework.data.r2dbc.repository.Query
import org.springframework.data.repository.reactive.ReactiveCrudRepository
import org.springframework.stereotype.Repository
import reactor.core.publisher.Mono
import java.math.BigDecimal
import java.time.LocalDateTime

@Repository
interface OrderStatusRepository : ReactiveCrudRepository<OrderStatusModel, Long> {

@Query(
"""
insert into order_status (ouid, executed_quantity, accumulative_quote_qty, status, appearance, date)
values (:ouid, :executedQuantity, :accumulativeQuoteQuantity, :status, :appearance, :date)
on conflict do nothing
"""
)
fun insert(
ouid: String,
executedQuantity: BigDecimal,
accumulativeQuoteQuantity: BigDecimal,
status: Int,
appearance: Int,
date: LocalDateTime = LocalDateTime.now()
): Mono<Void>

@Query(
"""
with max_appearance as (select max(appearance) as max_app from order_status where ouid = :ouid)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,25 +5,30 @@ import co.nilin.opex.market.core.event.RichOrderUpdate
import co.nilin.opex.market.core.inout.Order
import co.nilin.opex.market.core.inout.OrderStatus
import co.nilin.opex.market.core.spi.OrderPersister
import co.nilin.opex.market.ports.postgres.dao.OpenOrderRepository
import co.nilin.opex.market.ports.postgres.dao.OrderRepository
import co.nilin.opex.market.ports.postgres.dao.OrderStatusRepository
import co.nilin.opex.market.ports.postgres.model.OrderModel
import co.nilin.opex.market.ports.postgres.model.OrderStatusModel
import co.nilin.opex.market.ports.postgres.util.asOrderDTO
import kotlinx.coroutines.reactive.awaitFirstOrNull
import kotlinx.coroutines.reactor.awaitSingle
import kotlinx.coroutines.reactor.awaitSingleOrNull
import org.slf4j.LoggerFactory
import org.springframework.stereotype.Component
import org.springframework.transaction.annotation.Transactional
import java.time.LocalDateTime

@Component
class OrderPersisterImpl(
private val orderRepository: OrderRepository,
private val orderStatusRepository: OrderStatusRepository
private val orderStatusRepository: OrderStatusRepository,
private val openOrderRepository: OpenOrderRepository,
) : OrderPersister {

private val logger = LoggerFactory.getLogger(OrderPersisterImpl::class.java)

@Transactional
override suspend fun save(order: RichOrder) {
orderRepository.save(
OrderModel(
Expand All @@ -50,33 +55,46 @@ class OrderPersisterImpl(
).awaitFirstOrNull()
logger.info("order ${order.ouid} saved")

orderStatusRepository.save(
OrderStatusModel(
order.ouid,
order.executedQuantity,
order.accumulativeQuoteQty,
OrderStatus.NEW.code,
OrderStatus.NEW.orderOfAppearance
)
orderStatusRepository.insert(
order.ouid,
order.executedQuantity,
order.accumulativeQuoteQty,
OrderStatus.NEW.code,
OrderStatus.NEW.orderOfAppearance
).awaitFirstOrNull()
logger.info("OrderStatus ${order.ouid} saved with status of 'NEW'")

val lastStatus = orderStatusRepository.findMostRecentByOUID(order.ouid).awaitSingle()
if (OrderStatus.fromCode(lastStatus.status)!!.isOpenOrder()) {
openOrderRepository.insertOrUpdate(order.ouid, lastStatus.executedQuantity, lastStatus.status)
.awaitFirstOrNull()
logger.info("Order ${order.ouid} added to open orders")
} else {
openOrderRepository.delete(order.ouid).awaitSingleOrNull()
logger.info("Order ${order.ouid} deleted from open orders")
}
}

@Transactional
override suspend fun update(orderUpdate: RichOrderUpdate) {
try {
orderStatusRepository.save(
OrderStatusModel(
orderUpdate.ouid,
orderUpdate.executedQuantity(),
orderUpdate.accumulativeQuoteQuantity(),
orderUpdate.status.code,
orderUpdate.status.orderOfAppearance
)
).awaitFirstOrNull()
} catch (e: Exception) {
logger.error("Error updating order status: ${e.message}")
}
orderStatusRepository.insert(
orderUpdate.ouid,
orderUpdate.executedQuantity(),
orderUpdate.accumulativeQuoteQuantity(),
orderUpdate.status.code,
orderUpdate.status.orderOfAppearance
).awaitFirstOrNull()
logger.info("OrderStatus ${orderUpdate.ouid} updated with status of ${orderUpdate.status}")

val lastStatus = orderStatusRepository.findMostRecentByOUID(orderUpdate.ouid).awaitSingle()
if (OrderStatus.fromCode(lastStatus.status)!!.isOpenOrder()) {
openOrderRepository.insertOrUpdate(orderUpdate.ouid, lastStatus.executedQuantity, lastStatus.status)
.awaitFirstOrNull()
logger.info("Order ${orderUpdate.ouid} added to open orders")
} else {
openOrderRepository.delete(orderUpdate.ouid).awaitSingleOrNull()
logger.info("Order ${orderUpdate.ouid} deleted from open orders")
}
}

override suspend fun load(ouid: String): Order? {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package co.nilin.opex.market.ports.postgres.model

import org.springframework.data.annotation.Id
import org.springframework.data.relational.core.mapping.Table
import java.math.BigDecimal

@Table("open_orders")
data class OpenOrderModel(
val ouid: String,
val executedQuantity: BigDecimal?,
val status: Int,
@Id
val id: Long? = null,
)
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,14 @@ CREATE TABLE IF NOT EXISTS order_status
UNIQUE (ouid, status, appearance, executed_quantity)
);

CREATE TABLE IF NOT EXISTS open_orders
(
id SERIAL PRIMARY KEY,
ouid VARCHAR(72) NOT NULL UNIQUE ,
executed_quantity DECIMAL,
status INTEGER NOT NULL
);

CREATE TABLE IF NOT EXISTS trades
(
id SERIAL PRIMARY KEY,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package co.nilin.opex.market.ports.postgres.impl

import co.nilin.opex.market.ports.postgres.dao.OpenOrderRepository
import co.nilin.opex.market.ports.postgres.dao.OrderRepository
import co.nilin.opex.market.ports.postgres.dao.OrderStatusRepository
import co.nilin.opex.market.ports.postgres.impl.sample.VALID
Expand All @@ -13,25 +14,44 @@ import reactor.core.publisher.Mono
class OrderPersisterTest {
private val orderRepository: OrderRepository = mockk()
private val orderStatusRepository: OrderStatusRepository = mockk()
private val orderPersister = OrderPersisterImpl(orderRepository, orderStatusRepository)
private val openOrderRepository: OpenOrderRepository = mockk()
private val orderPersister = OrderPersisterImpl(orderRepository, orderStatusRepository, openOrderRepository)

@Test
fun givenOrderRepo_whenSaveRichOrder_thenSuccess(): Unit = runBlocking {
every {
orderRepository.save(any())
} returns Mono.just(VALID.MAKER_ORDER_MODEL)
every {
orderStatusRepository.save(any())
orderStatusRepository.insert(any(), any(), any(), any(), any(), any())
} returns Mono.empty()
every {
orderStatusRepository.findMostRecentByOUID(any())
} returns Mono.just(VALID.MAKER_ORDER_STATUS_MODEL)
every {
openOrderRepository.insertOrUpdate(any(), any(), any())
} returns Mono.empty()
every {
openOrderRepository.delete(any<String>())
} returns Mono.empty()

assertThatNoException().isThrownBy { runBlocking { orderPersister.save(VALID.RICH_ORDER) } }
}

@Test
fun givenOrderRepo_whenUpdateRichOrder_thenSuccess(): Unit = runBlocking {
every {
orderStatusRepository.save(any())
orderStatusRepository.insert(any(), any(), any(), any(), any(), any())
} returns Mono.empty()
every {
orderStatusRepository.findMostRecentByOUID(any())
} returns Mono.just(VALID.MAKER_ORDER_STATUS_MODEL)
every {
openOrderRepository.insertOrUpdate(any(), any(), any())
} returns Mono.empty()
every {
openOrderRepository.delete(any<String>())
} returns Mono.empty()

assertThatNoException().isThrownBy { runBlocking { orderPersister.update(VALID.RICH_ORDER_UPDATE) } }
}
Expand Down