Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 48 additions & 1 deletion accountant/accountant-app/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
<parent>
<groupId>co.nilin.opex.accountant</groupId>
<artifactId>accountant</artifactId>
<version>1.0.0-beta.3</version>
<version>1.0.1-beta.7</version>
</parent>

<groupId>co.nilin.opex.accountant.app</groupId>
Expand Down Expand Up @@ -72,6 +72,53 @@
<groupId>co.nilin.opex.utility.preferences</groupId>
<artifactId>preferences</artifactId>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>testcontainers</artifactId>
<version>1.18.0</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>postgresql</artifactId>
<version>1.18.0</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>r2dbc</artifactId>
<version>1.18.0</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream</artifactId>
<version>3.2.6</version>
<type>test-jar</type>
<scope>test</scope>
<classifier>test-binder</classifier>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<version>2.9.4</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>kafka</artifactId>
<version>1.18.0</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito.kotlin</groupId>
<artifactId>mockito-kotlin</artifactId>
</dependency>
<dependency>
<groupId>io.mockk</groupId>
<artifactId>mockk</artifactId>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,19 @@ import org.springframework.scheduling.annotation.EnableScheduling
@EnableScheduling
class AppConfig {

@Bean
fun getFinancialActionJobManager(
financialActionLoader: FinancialActionLoader,
financialActionPersister: FinancialActionPersister,
walletProxy: WalletProxy
): FinancialActionJobManager {
return FinancialActionJobManagerImpl(
financialActionLoader,
financialActionPersister,
walletProxy
)
}

@Bean
fun orderManager(
pairConfigLoader: PairConfigLoader,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
package co.nilin.opex.accountant.app.listener

import co.nilin.opex.accountant.core.api.OrderManager
import co.nilin.opex.accountant.ports.kafka.listener.inout.OrderSubmitRequest
import co.nilin.opex.accountant.ports.kafka.listener.inout.OrderRequestEvent
import co.nilin.opex.accountant.ports.kafka.listener.inout.OrderSubmitRequestEvent
import co.nilin.opex.accountant.ports.kafka.listener.spi.OrderSubmitRequestListener
import co.nilin.opex.matching.engine.core.eventh.events.SubmitOrderEvent
import kotlinx.coroutines.runBlocking
Expand All @@ -15,25 +16,26 @@ class OrderListener(private val orderManager: OrderManager) : OrderSubmitRequest
return "OrderListener"
}

override fun onEvent(event: OrderSubmitRequest, partition: Int, offset: Long, timestamp: Long) {
override fun onEvent(event: OrderRequestEvent, partition: Int, offset: Long, timestamp: Long) {
runBlocking {
logger.info("Order submit event received ${event.ouid}")

orderManager.handleRequestOrder(
SubmitOrderEvent(
event.ouid,
event.uuid,
event.orderId,
event.pair,
event.price,
event.quantity,
event.quantity,
event.direction,
event.matchConstraint,
event.orderType,
event.userLevel
if (event is OrderSubmitRequestEvent) {
logger.info("Order submit event received ${event.ouid}")
orderManager.handleRequestOrder(
SubmitOrderEvent(
event.ouid,
event.uuid,
event.orderId,
event.pair,
event.price,
event.quantity,
event.quantity,
event.direction,
event.matchConstraint,
event.orderType,
event.userLevel
)
)
)
}
}
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package co.nilin.opex.accountant.app.scheduler

import co.nilin.opex.accountant.core.api.FinancialActionJobManager
import kotlinx.coroutines.*
import org.slf4j.LoggerFactory
import org.springframework.context.annotation.Profile
Expand All @@ -8,12 +9,12 @@ import org.springframework.stereotype.Service

@Service
@Profile("scheduled")
class FinancialActionsJob() {
class FinancialActionsJob(private val financialActionJobManager: FinancialActionJobManager) {

private val log = LoggerFactory.getLogger(FinancialActionsJob::class.java)
private val scope = CoroutineScope(Dispatchers.IO)

//@Scheduled(fixedDelay = 10000, initialDelay = 10000)
@Scheduled(fixedDelay = 10000, initialDelay = 10000)
fun processFinancialActions() {
scope.ensureActive()
if (!scope.isCompleted())
Expand All @@ -22,9 +23,9 @@ class FinancialActionsJob() {
scope.launch {
try {
//read unprocessed fa records and call transfer
//financialActionProcessor.batchProcess(0, 100)
financialActionJobManager.processFinancialActions(0, 100)
} catch (e: Exception) {
log.error("Financial action manager unable to batch process", e)
log.error("Job error!", e)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,12 @@ import io.micrometer.core.instrument.MeterRegistry
import org.springframework.boot.actuate.health.HealthComponent
import org.springframework.boot.actuate.health.HealthEndpoint
import org.springframework.boot.actuate.health.SystemHealth
import org.springframework.context.annotation.Profile
import org.springframework.scheduling.annotation.Scheduled
import org.springframework.stereotype.Component

@Component
@Profile("!test")
class PrometheusHealthExtension(
private val registry: MeterRegistry,
private val endpoint: HealthEndpoint
Expand Down
4 changes: 2 additions & 2 deletions accountant/accountant-app/src/main/resources/application.yml
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
server.port: 8080
logging:
level:
co.nilin: DEBUG
reactor.netty.http.client: DEBUG
co.nilin: INFO
reactor.netty.http.client: INFO
spring:
application:
name: opex-accountant
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package co.nilin.opex.accountant.app

import org.junit.jupiter.api.Test
import org.springframework.boot.test.context.SpringBootTest
import org.springframework.cloud.stream.binder.test.TestChannelBinderConfiguration
import org.springframework.context.annotation.Import
import org.springframework.test.context.ActiveProfiles

@SpringBootTest
@ActiveProfiles("test")
@Import(TestChannelBinderConfiguration::class)
class AccountantAppTest {
@Test
fun contextLoad() {

}
}
50 changes: 50 additions & 0 deletions accountant/accountant-app/src/test/resources/application.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
server.port: 8080
logging:
level:
co.nilin: DEBUG
reactor.netty.http.client: DEBUG
spring:
application:
name: opex-accountant
main:
allow-bean-definition-overriding: true
allow-circular-references: true
kafka:
bootstrap-servers: ${KAFKA_IP_PORT:localhost:9092}
consumer:
group-id: accountant
r2dbc:
url: r2dbc:tc:postgresql:///accountant?TC_IMAGE_TAG=9.6.8
initialization-mode: always
cloud:
bootstrap:
enabled: true
discovery:
enabled: false
consul:
enabled: false
config:
enabled: false
vault:
enabled: false

management:
health:
vault:
enabled: false
endpoints:
web:
base-path: /actuator
exposure:
include: ["health", "metrics"]
endpoint:
health:
show-details: always
metrics:
enabled: true
prometheus:
enabled: false
app:
address: 1
wallet:
url: ""
2 changes: 1 addition & 1 deletion accountant/accountant-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
<parent>
<groupId>co.nilin.opex.accountant</groupId>
<artifactId>accountant</artifactId>
<version>1.0.0-beta.3</version>
<version>1.0.1-beta.7</version>
</parent>

<groupId>co.nilin.opex.accountant.core</groupId>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,40 +1,47 @@
package co.nilin.opex.accountant.core.service

import co.nilin.opex.accountant.core.api.FinancialActionJobManager
import co.nilin.opex.accountant.core.inout.TransferRequest
import co.nilin.opex.accountant.core.model.FinancialAction
import co.nilin.opex.accountant.core.model.FinancialActionStatus
import co.nilin.opex.accountant.core.spi.FinancialActionLoader
import co.nilin.opex.accountant.core.spi.FinancialActionPersister
import co.nilin.opex.accountant.core.spi.FinancialActionPublisher
import co.nilin.opex.accountant.core.spi.WalletProxy
import org.slf4j.LoggerFactory

class FinancialActionJobManagerImpl(
private val financialActionLoader: FinancialActionLoader,
private val financialActionPersister: FinancialActionPersister,
private val financialActionPublisher: FinancialActionPublisher,
private val walletProxy: WalletProxy
) : FinancialActionJobManager {

private val logger = LoggerFactory.getLogger(FinancialActionJobManagerImpl::class.java)

override suspend fun processFinancialActions(offset: Long, size: Long) {
val factions = financialActionLoader.loadUnprocessed(offset, size)
publishFinancialActions(factions)
}

private suspend fun publishFinancialActions(financialActions: List<FinancialAction>) {
val list = arrayListOf<FinancialAction>()
financialActions.forEach { extractFAParents(it, list) }
for (fa in list) {
if (fa.status == FinancialActionStatus.CREATED) {
try {
financialActionPublisher.publish(fa)
} catch (e: Exception) {
logger.error("Cannot publish fa ${fa.uuid}", e)
break
override suspend fun processFinancialActions(offset: Long, size: Long){
val factions = financialActionLoader.loadReadyToProcess(offset, size)
factions.forEach {
try {
if (it.parent != null) {
val reloadParent = financialActionLoader.loadFinancialAction(it.parent.id)!!
if (reloadParent.status != FinancialActionStatus.PROCESSED) {
logger.warn("financial job {} skipped because of parent status {}", it, reloadParent)
return@forEach
}
}
financialActionPersister.updateStatus(fa, FinancialActionStatus.PROCESSED)
walletProxy.transfer(
it.symbol,
it.senderWalletType,
it.sender,
it.receiverWalletType,
it.receiver,
it.amount,
it.eventType + it.pointer,
null
)
financialActionPersister.updateStatusNewTx(it, FinancialActionStatus.PROCESSED)

} catch (e: Exception) {
logger.error("financial job error", e)
financialActionPersister.updateStatusNewTx(it, FinancialActionStatus.ERROR)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import co.nilin.opex.accountant.core.model.FinancialActionStatus
import co.nilin.opex.accountant.core.model.Order
import co.nilin.opex.accountant.core.spi.*
import co.nilin.opex.matching.engine.core.eventh.events.*
import co.nilin.opex.matching.engine.core.inout.RequestedOperation
import co.nilin.opex.matching.engine.core.model.OrderDirection
import org.springframework.transaction.annotation.Transactional
import java.math.BigDecimal
Expand Down Expand Up @@ -96,9 +97,9 @@ open class OrderManagerImpl(
OrderStatus.REQUESTED.code
)
)
val fa = financialActionPersister.persist(listOf(financialAction))
publishFinancialAction(financialAction)
return fa
return financialActionPersister.persist(listOf(financialAction))
/*publishFinancialAction(financialAction)
return fa*/
}

@Transactional
Expand All @@ -122,6 +123,9 @@ open class OrderManagerImpl(

@Transactional
override suspend fun handleRejectOrder(rejectOrderEvent: RejectOrderEvent): List<FinancialAction> {
if (rejectOrderEvent.requestedOperation != RequestedOperation.PLACE_ORDER)
return emptyList()

//order by ouid
val order = orderPersister.load(rejectOrderEvent.ouid)
if (order == null) {
Expand Down Expand Up @@ -161,9 +165,9 @@ open class OrderManagerImpl(
OrderStatus.REJECTED
)
)
val fa = financialActionPersister.persist(listOf(financialAction))
publishFinancialAction(financialAction)
return fa
return financialActionPersister.persist(listOf(financialAction))
/*publishFinancialAction(financialAction)
return fa*/
}

@Transactional
Expand Down Expand Up @@ -207,9 +211,9 @@ open class OrderManagerImpl(
OrderStatus.CANCELED
)
)
val fa = financialActionPersister.persist(listOf(financialAction))
publishFinancialAction(financialAction)
return fa
return financialActionPersister.persist(listOf(financialAction))
/*publishFinancialAction(financialAction)
return fa*/
}

private suspend fun publishRichOrder(order: Order, remainedQuantity: BigDecimal, status: OrderStatus? = null) {
Expand Down
Loading