Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,7 @@ import co.nilin.opex.accountant.app.listener.AccountantEventListener
import co.nilin.opex.accountant.app.listener.AccountantTempEventListener
import co.nilin.opex.accountant.app.listener.AccountantTradeListener
import co.nilin.opex.accountant.app.listener.OrderListener
import co.nilin.opex.accountant.core.api.FeeCalculator
import co.nilin.opex.accountant.core.api.FinancialActionJobManager
import co.nilin.opex.accountant.core.api.OrderManager
import co.nilin.opex.accountant.core.api.TradeManager
import co.nilin.opex.accountant.core.api.*
import co.nilin.opex.accountant.core.service.FinancialActionJobManagerImpl
import co.nilin.opex.accountant.core.service.OrderManagerImpl
import co.nilin.opex.accountant.core.service.TradeManagerImpl
Expand All @@ -25,19 +22,6 @@ import org.springframework.scheduling.annotation.EnableScheduling
@EnableScheduling
class AppConfig {

@Bean
fun getFinancialActionJobManager(
financialActionLoader: FinancialActionLoader,
financialActionPersister: FinancialActionPersister,
walletProxy: WalletProxy
): FinancialActionJobManager {
return FinancialActionJobManagerImpl(
financialActionLoader,
financialActionPersister,
walletProxy
)
}

@Bean
fun orderManager(
pairConfigLoader: PairConfigLoader,
Expand All @@ -48,6 +32,7 @@ class AppConfig {
tempEventPersister: TempEventPersister,
tempEventRepublisher: TempEventRepublisher,
richOrderPublisher: RichOrderPublisher,
financialActionPublisher: FinancialActionPublisher,
): OrderManager {
return OrderManagerImpl(
pairConfigLoader,
Expand All @@ -56,7 +41,8 @@ class AppConfig {
financeActionLoader,
orderPersister,
tempEventPersister,
richOrderPublisher
richOrderPublisher,
financialActionPublisher
)
}

Expand All @@ -69,6 +55,7 @@ class AppConfig {
richTradePublisher: RichTradePublisher,
richOrderPublisher: RichOrderPublisher,
feeCalculator: FeeCalculator,
financialActionPublisher: FinancialActionPublisher,
): TradeManager {
return TradeManagerImpl(
financeActionPersister,
Expand All @@ -77,7 +64,8 @@ class AppConfig {
tempEventPersister,
richTradePublisher,
richOrderPublisher,
feeCalculator
feeCalculator,
financialActionPublisher
)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,21 @@ import co.nilin.opex.accountant.core.api.TradeManager
import co.nilin.opex.accountant.ports.kafka.listener.spi.TempEventListener
import co.nilin.opex.matching.engine.core.eventh.events.*
import kotlinx.coroutines.runBlocking
import org.slf4j.LoggerFactory

class AccountantTempEventListener(
private val orderManager: OrderManager,
private val tradeManager: TradeManager
) : TempEventListener {

private val logger = LoggerFactory.getLogger(AccountantTempEventListener::class.java)

override fun id(): String {
return "TempEventListener"
}

override fun onEvent(event: CoreEvent, partition: Int, offset: Long, timestamp: Long) {
println("TempEvent $event")
logger.info("TempEvent received $event")
runBlocking {
when (event) {
is CreateOrderEvent -> orderManager.handleNewOrder(event)
Expand All @@ -29,6 +32,6 @@ class AccountantTempEventListener(
}
}
}
println("onEvent")
logger.info("TempEvent processed")
}
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package co.nilin.opex.accountant.app.listener

import co.nilin.opex.accountant.core.api.TradeManager
import co.nilin.opex.accountant.ports.kafka.listener.spi.Listener
import co.nilin.opex.accountant.ports.kafka.listener.spi.TradeListener
import co.nilin.opex.matching.engine.core.eventh.events.TradeEvent
import kotlinx.coroutines.runBlocking
Expand Down
Original file line number Diff line number Diff line change
@@ -1,29 +1,34 @@
package co.nilin.opex.accountant.app.scheduler

import co.nilin.opex.accountant.core.api.FinancialActionJobManager
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.*
import org.slf4j.LoggerFactory
import org.springframework.context.annotation.Profile
import org.springframework.scheduling.annotation.Scheduled
import org.springframework.stereotype.Service

@Service
@Profile("scheduled")
class FinancialActionsJob(val financialActionJobManager: FinancialActionJobManager) {
class FinancialActionsJob() {

private val log = LoggerFactory.getLogger(FinancialActionsJob::class.java)
private val scope = CoroutineScope(Dispatchers.IO)

@Scheduled(fixedDelay = 10000)
//@Scheduled(fixedDelay = 10000, initialDelay = 10000)
fun processFinancialActions() {
runBlocking(Dispatchers.IO) {
scope.ensureActive()
if (!scope.isCompleted())
return

scope.launch {
try {
//read unprocessed fa records and call transfer
financialActionJobManager.processFinancialActions(0, 100)
//financialActionProcessor.batchProcess(0, 100)
} catch (e: Exception) {
log.error("Job error!", e)
log.error("Financial action manager unable to batch process", e)
}
}
}

private fun CoroutineScope.isCompleted() = coroutineContext.job.children.all { it.isCompleted }

}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
package co.nilin.opex.accountant.core.api

interface FinancialActionJobManager {
suspend fun processFinancialActions(offset: Long, size: Long)
}
package co.nilin.opex.accountant.core.api
interface FinancialActionJobManager {
suspend fun processFinancialActions(offset: Long, size: Long)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package co.nilin.opex.accountant.core.inout

import java.math.BigDecimal
import java.time.LocalDateTime

data class FinancialActionEvent(
val uuid: String,
val symbol: String,
val amount: BigDecimal,
val sender: String,
val senderWalletType: String,
val receiver: String,
val receiverWalletType: String,
val createDate: LocalDateTime,
val transferRef: String?,
val description: String
)
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package co.nilin.opex.accountant.core.model

import java.math.BigDecimal
import java.time.LocalDateTime
import java.util.UUID

class FinancialAction(
val parent: FinancialAction?,
Expand All @@ -14,10 +15,19 @@ class FinancialAction(
val receiver: String,
val receiverWalletType: String,
val createDate: LocalDateTime,
val retryCount: Int = 0,
val status: FinancialActionStatus = FinancialActionStatus.CREATED,
val uuid: String = UUID.randomUUID().toString(),
val id: Long? = null
) {

override fun equals(other: Any?): Boolean {
if (other == null || other !is FinancialAction) return false
return if (id != null && other.id != null)
id == other.id
else
uuid == other.uuid
}

override fun toString(): String {
return "FinancialAction(id=$id, parent=$parent, eventType='$eventType', pointer='$pointer', symbol='$symbol', amount=$amount, sender='$sender', senderWalletType='$senderWalletType', receiver='$receiver', receiverWalletType='$receiverWalletType', createDate=$createDate)"
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,53 +6,45 @@ import co.nilin.opex.accountant.core.model.FinancialAction
import co.nilin.opex.accountant.core.model.FinancialActionStatus
import co.nilin.opex.accountant.core.spi.FinancialActionLoader
import co.nilin.opex.accountant.core.spi.FinancialActionPersister
import co.nilin.opex.accountant.core.spi.FinancialActionPublisher
import co.nilin.opex.accountant.core.spi.WalletProxy
import org.slf4j.LoggerFactory

class FinancialActionJobManagerImpl(
private val financialActionLoader: FinancialActionLoader,
private val financialActionPersister: FinancialActionPersister,
private val walletProxy: WalletProxy
private val financialActionPublisher: FinancialActionPublisher,
) : FinancialActionJobManager {

private val logger = LoggerFactory.getLogger(FinancialActionJobManagerImpl::class.java)

override suspend fun processFinancialActions(offset: Long, size: Long) {
val factions = financialActionLoader.loadUnprocessed(offset, size)
val flatten = sortAndFlattenFA(factions)
logger.info("Loaded ${flatten.size} factions: ${flatten.map { it.id }}")
if (factions.isEmpty())
return
publishFinancialActions(factions)
}

try {
val requests = factions.map {
TransferRequest(
it.amount,
it.symbol,
it.sender,
it.senderWalletType,
it.receiver,
it.receiverWalletType,
null,
it.eventType + it.pointer
)
private suspend fun publishFinancialActions(financialActions: List<FinancialAction>) {
val list = arrayListOf<FinancialAction>()
financialActions.forEach { extractFAParents(it, list) }
for (fa in list) {
if (fa.status == FinancialActionStatus.CREATED) {
try {
financialActionPublisher.publish(fa)
} catch (e: Exception) {
logger.error("Cannot publish fa ${fa.uuid}", e)
break
}
financialActionPersister.updateStatus(fa, FinancialActionStatus.PROCESSED)
}
walletProxy.batchTransfer(requests)
financialActionPersister.updateBatchStatus(factions, FinancialActionStatus.PROCESSED)
} catch (e: Exception) {
logger.error("financial job error", e)
}
}

fun sortAndFlattenFA(list: List<FinancialAction>): Collection<FinancialAction> {
val result = arrayListOf<FinancialAction>()

fun extractParent(fa: FinancialAction) {
if (fa.parent != null)
extractParent(fa.parent)
result.add(fa)
private fun extractFAParents(financialAction: FinancialAction, list: ArrayList<FinancialAction>) {
if (financialAction.parent != null) {
extractFAParents(financialAction.parent, list)
}
list.forEach { extractParent(it) }
return result.distinctBy { it.id }

if (!list.contains(financialAction))
list.add(financialAction)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import co.nilin.opex.accountant.core.inout.OrderStatus
import co.nilin.opex.accountant.core.inout.RichOrder
import co.nilin.opex.accountant.core.inout.RichOrderUpdate
import co.nilin.opex.accountant.core.model.FinancialAction
import co.nilin.opex.accountant.core.model.FinancialActionStatus
import co.nilin.opex.accountant.core.model.Order
import co.nilin.opex.accountant.core.spi.*
import co.nilin.opex.matching.engine.core.eventh.events.*
Expand All @@ -20,7 +21,8 @@ open class OrderManagerImpl(
private val financeActionLoader: FinancialActionLoader,
private val orderPersister: OrderPersister,
private val tempEventPersister: TempEventPersister,
private val richOrderPublisher: RichOrderPublisher
private val richOrderPublisher: RichOrderPublisher,
private val financialActionPublisher: FinancialActionPublisher
) : OrderManager {

@Transactional
Expand Down Expand Up @@ -94,7 +96,9 @@ open class OrderManagerImpl(
OrderStatus.REQUESTED.code
)
)
return financialActionPersister.persist(listOf(financialAction))
val fa = financialActionPersister.persist(listOf(financialAction))
publishFinancialAction(financialAction)
return fa
}

@Transactional
Expand Down Expand Up @@ -157,7 +161,9 @@ open class OrderManagerImpl(
OrderStatus.REJECTED
)
)
return financialActionPersister.persist(listOf(financialAction))
val fa = financialActionPersister.persist(listOf(financialAction))
publishFinancialAction(financialAction)
return fa
}

@Transactional
Expand Down Expand Up @@ -201,7 +207,9 @@ open class OrderManagerImpl(
OrderStatus.REJECTED
)
)
return financialActionPersister.persist(listOf(financialAction))
val fa = financialActionPersister.persist(listOf(financialAction))
publishFinancialAction(financialAction)
return fa
}

private suspend fun publishRichOrder(order: Order, remainedQuantity: BigDecimal, status: OrderStatus? = null) {
Expand Down Expand Up @@ -237,4 +245,14 @@ open class OrderManagerImpl(
)
)
}

private suspend fun publishFinancialAction(financialAction: FinancialAction) {
if (financialAction.parent != null)
publishFinancialAction(financialAction.parent)

if (financialAction.status == FinancialActionStatus.CREATED) {
financialActionPublisher.publish(financialAction)
financialActionPersister.updateStatus(financialAction.uuid, FinancialActionStatus.PROCESSED)
}
}
}
Loading