From afc4d44a6a8fc280031b8c4cd0d824de1925381f Mon Sep 17 00:00:00 2001 From: Peyman Date: Tue, 9 May 2023 17:47:28 +0430 Subject: [PATCH 1/8] Revert financial action to use REST mechanism --- .../opex/accountant/app/config/AppConfig.kt | 13 ++++++++ .../app/scheduler/FinancialActionsJob.kt | 9 ++--- .../service/FinancialActionJobManagerImpl.kt | 33 ++++++++++--------- .../core/service/OrderManagerImpl.kt | 18 +++++----- .../core/service/TradeManagerImpl.kt | 3 +- .../wallet/app/service/TransferService.kt | 9 +++++ .../src/main/resources/application.yml | 5 +-- 7 files changed, 58 insertions(+), 32 deletions(-) diff --git a/accountant/accountant-app/src/main/kotlin/co/nilin/opex/accountant/app/config/AppConfig.kt b/accountant/accountant-app/src/main/kotlin/co/nilin/opex/accountant/app/config/AppConfig.kt index 30080a8c9..bae4d3edd 100644 --- a/accountant/accountant-app/src/main/kotlin/co/nilin/opex/accountant/app/config/AppConfig.kt +++ b/accountant/accountant-app/src/main/kotlin/co/nilin/opex/accountant/app/config/AppConfig.kt @@ -22,6 +22,19 @@ import org.springframework.scheduling.annotation.EnableScheduling @EnableScheduling class AppConfig { + @Bean + fun getFinancialActionJobManager( + financialActionLoader: FinancialActionLoader, + financialActionPersister: FinancialActionPersister, + walletProxy: WalletProxy + ): FinancialActionJobManager { + return FinancialActionJobManagerImpl( + financialActionLoader, + financialActionPersister, + walletProxy + ) + } + @Bean fun orderManager( pairConfigLoader: PairConfigLoader, diff --git a/accountant/accountant-app/src/main/kotlin/co/nilin/opex/accountant/app/scheduler/FinancialActionsJob.kt b/accountant/accountant-app/src/main/kotlin/co/nilin/opex/accountant/app/scheduler/FinancialActionsJob.kt index 08a643fb8..30a69a1d9 100644 --- a/accountant/accountant-app/src/main/kotlin/co/nilin/opex/accountant/app/scheduler/FinancialActionsJob.kt +++ b/accountant/accountant-app/src/main/kotlin/co/nilin/opex/accountant/app/scheduler/FinancialActionsJob.kt @@ -1,5 +1,6 @@ package co.nilin.opex.accountant.app.scheduler +import co.nilin.opex.accountant.core.api.FinancialActionJobManager import kotlinx.coroutines.* import org.slf4j.LoggerFactory import org.springframework.context.annotation.Profile @@ -8,12 +9,12 @@ import org.springframework.stereotype.Service @Service @Profile("scheduled") -class FinancialActionsJob() { +class FinancialActionsJob(private val financialActionJobManager: FinancialActionJobManager) { private val log = LoggerFactory.getLogger(FinancialActionsJob::class.java) private val scope = CoroutineScope(Dispatchers.IO) - //@Scheduled(fixedDelay = 10000, initialDelay = 10000) + @Scheduled(fixedDelay = 10000, initialDelay = 10000) fun processFinancialActions() { scope.ensureActive() if (!scope.isCompleted()) @@ -22,9 +23,9 @@ class FinancialActionsJob() { scope.launch { try { //read unprocessed fa records and call transfer - //financialActionProcessor.batchProcess(0, 100) + financialActionJobManager.processFinancialActions(0, 100) } catch (e: Exception) { - log.error("Financial action manager unable to batch process", e) + log.error("Job error!", e) } } } diff --git a/accountant/accountant-core/src/main/kotlin/co/nilin/opex/accountant/core/service/FinancialActionJobManagerImpl.kt b/accountant/accountant-core/src/main/kotlin/co/nilin/opex/accountant/core/service/FinancialActionJobManagerImpl.kt index c79004997..ab21ee9ef 100644 --- a/accountant/accountant-core/src/main/kotlin/co/nilin/opex/accountant/core/service/FinancialActionJobManagerImpl.kt +++ b/accountant/accountant-core/src/main/kotlin/co/nilin/opex/accountant/core/service/FinancialActionJobManagerImpl.kt @@ -13,28 +13,29 @@ import org.slf4j.LoggerFactory class FinancialActionJobManagerImpl( private val financialActionLoader: FinancialActionLoader, private val financialActionPersister: FinancialActionPersister, - private val financialActionPublisher: FinancialActionPublisher, + private val walletProxy: WalletProxy ) : FinancialActionJobManager { private val logger = LoggerFactory.getLogger(FinancialActionJobManagerImpl::class.java) override suspend fun processFinancialActions(offset: Long, size: Long) { val factions = financialActionLoader.loadUnprocessed(offset, size) - publishFinancialActions(factions) - } - - private suspend fun publishFinancialActions(financialActions: List) { - val list = arrayListOf() - financialActions.forEach { extractFAParents(it, list) } - for (fa in list) { - if (fa.status == FinancialActionStatus.CREATED) { - try { - financialActionPublisher.publish(fa) - } catch (e: Exception) { - logger.error("Cannot publish fa ${fa.uuid}", e) - break - } - financialActionPersister.updateStatus(fa, FinancialActionStatus.PROCESSED) + factions.forEach { + try { + walletProxy.transfer( + it.symbol, + it.senderWalletType, + it.sender, + it.receiverWalletType, + it.receiver, + it.amount, + it.eventType + it.pointer, + null + ) + financialActionPersister.updateStatus(it, FinancialActionStatus.PROCESSED) + } catch (e: Exception) { + logger.error("financial job error", e) + financialActionPersister.updateStatus(it, FinancialActionStatus.ERROR) } } } diff --git a/accountant/accountant-core/src/main/kotlin/co/nilin/opex/accountant/core/service/OrderManagerImpl.kt b/accountant/accountant-core/src/main/kotlin/co/nilin/opex/accountant/core/service/OrderManagerImpl.kt index 5ce39ecf6..afc94a6ab 100644 --- a/accountant/accountant-core/src/main/kotlin/co/nilin/opex/accountant/core/service/OrderManagerImpl.kt +++ b/accountant/accountant-core/src/main/kotlin/co/nilin/opex/accountant/core/service/OrderManagerImpl.kt @@ -96,9 +96,9 @@ open class OrderManagerImpl( OrderStatus.REQUESTED.code ) ) - val fa = financialActionPersister.persist(listOf(financialAction)) - publishFinancialAction(financialAction) - return fa + return financialActionPersister.persist(listOf(financialAction)) + /*publishFinancialAction(financialAction) + return fa*/ } @Transactional @@ -161,9 +161,9 @@ open class OrderManagerImpl( OrderStatus.REJECTED ) ) - val fa = financialActionPersister.persist(listOf(financialAction)) - publishFinancialAction(financialAction) - return fa + return financialActionPersister.persist(listOf(financialAction)) + /*publishFinancialAction(financialAction) + return fa*/ } @Transactional @@ -207,9 +207,9 @@ open class OrderManagerImpl( OrderStatus.CANCELED ) ) - val fa = financialActionPersister.persist(listOf(financialAction)) - publishFinancialAction(financialAction) - return fa + return financialActionPersister.persist(listOf(financialAction)) + /*publishFinancialAction(financialAction) + return fa*/ } private suspend fun publishRichOrder(order: Order, remainedQuantity: BigDecimal, status: OrderStatus? = null) { diff --git a/accountant/accountant-core/src/main/kotlin/co/nilin/opex/accountant/core/service/TradeManagerImpl.kt b/accountant/accountant-core/src/main/kotlin/co/nilin/opex/accountant/core/service/TradeManagerImpl.kt index 5e959ea26..d12e81bbe 100644 --- a/accountant/accountant-core/src/main/kotlin/co/nilin/opex/accountant/core/service/TradeManagerImpl.kt +++ b/accountant/accountant-core/src/main/kotlin/co/nilin/opex/accountant/core/service/TradeManagerImpl.kt @@ -169,7 +169,8 @@ open class TradeManagerImpl( trade.eventDate ) ) - return financeActionPersister.persist(financialActions).also { publishFinancialActions(it) } + return financeActionPersister.persist(financialActions) + //return financeActionPersister.persist(financialActions).also { publishFinancialActions(it) } } private suspend fun publishTakerRichOrderUpdate(takerOrder: Order, trade: TradeEvent) { diff --git a/wallet/wallet-app/src/main/kotlin/co/nilin/opex/wallet/app/service/TransferService.kt b/wallet/wallet-app/src/main/kotlin/co/nilin/opex/wallet/app/service/TransferService.kt index 4f3ca3ed8..8c45e397b 100644 --- a/wallet/wallet-app/src/main/kotlin/co/nilin/opex/wallet/app/service/TransferService.kt +++ b/wallet/wallet-app/src/main/kotlin/co/nilin/opex/wallet/app/service/TransferService.kt @@ -10,6 +10,7 @@ import co.nilin.opex.wallet.core.spi.CurrencyService import co.nilin.opex.wallet.core.spi.TransferManager import co.nilin.opex.wallet.core.spi.WalletManager import co.nilin.opex.wallet.core.spi.WalletOwnerManager +import org.slf4j.LoggerFactory import org.springframework.stereotype.Service import org.springframework.transaction.annotation.Transactional import java.math.BigDecimal @@ -22,6 +23,8 @@ class TransferService( private val walletOwnerManager: WalletOwnerManager ) { + private val logger = LoggerFactory.getLogger(TransferService::class.java) + @Transactional suspend fun transfer( symbol: String, @@ -54,6 +57,12 @@ class TransferService( currency, receiverWalletType ) + + logger.info( + "Transferring funds: $amount ${sourceWallet.owner.id}-${sourceWallet.currency.symbol}-$senderWalletType " + + "==> ${receiverWallet.owner.id}-${receiverWallet.currency.symbol}-$receiverWalletType " + ) + return transferManager.transfer( TransferCommand( sourceWallet, diff --git a/wallet/wallet-app/src/main/resources/application.yml b/wallet/wallet-app/src/main/resources/application.yml index 95fb7eb9f..efb397c1e 100644 --- a/wallet/wallet-app/src/main/resources/application.yml +++ b/wallet/wallet-app/src/main/resources/application.yml @@ -63,7 +63,8 @@ app: uuid: 1 logging: level: + root: INFO org.apache.kafka: ERROR - co.nilin: DEBUG - reactor.netty.http.client: DEBUG + co.nilin: INFO + reactor.netty.http.client: INFO swagger.authUrl: ${SWAGGER_AUTH_URL:https://api.opex.dev/auth}/realms/opex/protocol/openid-connect/token From 16c342a408eb4f655e1013301552b9970f4d5764 Mon Sep 17 00:00:00 2001 From: maryarm <45322329+maryarm@users.noreply.github.com> Date: Thu, 18 May 2023 16:23:28 +0200 Subject: [PATCH 2/8] fixed #357: Checks for parent status during sync of financial actions --- accountant/accountant-app/pom.xml | 47 +++++++++++++++++ .../app/utils/PrometheusHealthExtension.kt | 2 + .../opex/accountant/app/AccountantAppTest.kt | 17 +++++++ .../src/test/resources/application.yml | 50 +++++++++++++++++++ .../service/FinancialActionJobManagerImpl.kt | 18 ++++--- .../core/spi/FinancialActionLoader.kt | 2 + .../core/spi/FinancialActionPersister.kt | 1 + .../postgres/dao/FinancialActionRepository.kt | 13 +++-- .../impl/FinancialActionLoaderImpl.kt | 17 +++++-- .../impl/FinancialActionPersisterImpl.kt | 7 +++ .../src/test/resources/application.yml | 2 +- 11 files changed, 160 insertions(+), 16 deletions(-) create mode 100644 accountant/accountant-app/src/test/kotlin/co/nilin/opex/accountant/app/AccountantAppTest.kt create mode 100644 accountant/accountant-app/src/test/resources/application.yml diff --git a/accountant/accountant-app/pom.xml b/accountant/accountant-app/pom.xml index 2a49ffc42..fdce361da 100644 --- a/accountant/accountant-app/pom.xml +++ b/accountant/accountant-app/pom.xml @@ -72,6 +72,53 @@ co.nilin.opex.utility.preferences preferences + + org.testcontainers + testcontainers + 1.18.0 + test + + + org.testcontainers + postgresql + 1.18.0 + test + + + org.testcontainers + r2dbc + 1.18.0 + test + + + + org.springframework.cloud + spring-cloud-stream + 3.2.6 + test-jar + test + test-binder + + + org.springframework.kafka + spring-kafka-test + 2.9.4 + test + + + org.testcontainers + kafka + 1.18.0 + test + + + org.mockito.kotlin + mockito-kotlin + + + io.mockk + mockk + diff --git a/accountant/accountant-app/src/main/kotlin/co/nilin/opex/accountant/app/utils/PrometheusHealthExtension.kt b/accountant/accountant-app/src/main/kotlin/co/nilin/opex/accountant/app/utils/PrometheusHealthExtension.kt index 41763d3c0..28ec8def3 100644 --- a/accountant/accountant-app/src/main/kotlin/co/nilin/opex/accountant/app/utils/PrometheusHealthExtension.kt +++ b/accountant/accountant-app/src/main/kotlin/co/nilin/opex/accountant/app/utils/PrometheusHealthExtension.kt @@ -5,10 +5,12 @@ import io.micrometer.core.instrument.MeterRegistry import org.springframework.boot.actuate.health.HealthComponent import org.springframework.boot.actuate.health.HealthEndpoint import org.springframework.boot.actuate.health.SystemHealth +import org.springframework.context.annotation.Profile import org.springframework.scheduling.annotation.Scheduled import org.springframework.stereotype.Component @Component +@Profile("!test") class PrometheusHealthExtension( private val registry: MeterRegistry, private val endpoint: HealthEndpoint diff --git a/accountant/accountant-app/src/test/kotlin/co/nilin/opex/accountant/app/AccountantAppTest.kt b/accountant/accountant-app/src/test/kotlin/co/nilin/opex/accountant/app/AccountantAppTest.kt new file mode 100644 index 000000000..bda16916e --- /dev/null +++ b/accountant/accountant-app/src/test/kotlin/co/nilin/opex/accountant/app/AccountantAppTest.kt @@ -0,0 +1,17 @@ +package co.nilin.opex.accountant.app + +import org.junit.jupiter.api.Test +import org.springframework.boot.test.context.SpringBootTest +import org.springframework.cloud.stream.binder.test.TestChannelBinderConfiguration +import org.springframework.context.annotation.Import +import org.springframework.test.context.ActiveProfiles + +@SpringBootTest +@ActiveProfiles("test") +@Import(TestChannelBinderConfiguration::class) +class AccountantAppTest { + @Test + fun contextLoad() { + + } +} \ No newline at end of file diff --git a/accountant/accountant-app/src/test/resources/application.yml b/accountant/accountant-app/src/test/resources/application.yml new file mode 100644 index 000000000..68eb6bb9f --- /dev/null +++ b/accountant/accountant-app/src/test/resources/application.yml @@ -0,0 +1,50 @@ +server.port: 8080 +logging: + level: + co.nilin: DEBUG + reactor.netty.http.client: DEBUG +spring: + application: + name: opex-accountant + main: + allow-bean-definition-overriding: true + allow-circular-references: true + kafka: + bootstrap-servers: ${KAFKA_IP_PORT:localhost:9092} + consumer: + group-id: accountant + r2dbc: + url: r2dbc:tc:postgresql:///accountant?TC_IMAGE_TAG=9.6.8 + initialization-mode: always + cloud: + bootstrap: + enabled: true + discovery: + enabled: false + consul: + enabled: false + config: + enabled: false + vault: + enabled: false + +management: + health: + vault: + enabled: false + endpoints: + web: + base-path: /actuator + exposure: + include: ["health", "metrics"] + endpoint: + health: + show-details: always + metrics: + enabled: true + prometheus: + enabled: false +app: + address: 1 + wallet: + url: "" \ No newline at end of file diff --git a/accountant/accountant-core/src/main/kotlin/co/nilin/opex/accountant/core/service/FinancialActionJobManagerImpl.kt b/accountant/accountant-core/src/main/kotlin/co/nilin/opex/accountant/core/service/FinancialActionJobManagerImpl.kt index ab21ee9ef..5a488c3e1 100644 --- a/accountant/accountant-core/src/main/kotlin/co/nilin/opex/accountant/core/service/FinancialActionJobManagerImpl.kt +++ b/accountant/accountant-core/src/main/kotlin/co/nilin/opex/accountant/core/service/FinancialActionJobManagerImpl.kt @@ -1,12 +1,10 @@ package co.nilin.opex.accountant.core.service import co.nilin.opex.accountant.core.api.FinancialActionJobManager -import co.nilin.opex.accountant.core.inout.TransferRequest import co.nilin.opex.accountant.core.model.FinancialAction import co.nilin.opex.accountant.core.model.FinancialActionStatus import co.nilin.opex.accountant.core.spi.FinancialActionLoader import co.nilin.opex.accountant.core.spi.FinancialActionPersister -import co.nilin.opex.accountant.core.spi.FinancialActionPublisher import co.nilin.opex.accountant.core.spi.WalletProxy import org.slf4j.LoggerFactory @@ -18,10 +16,17 @@ class FinancialActionJobManagerImpl( private val logger = LoggerFactory.getLogger(FinancialActionJobManagerImpl::class.java) - override suspend fun processFinancialActions(offset: Long, size: Long) { - val factions = financialActionLoader.loadUnprocessed(offset, size) + override suspend fun processFinancialActions(offset: Long, size: Long){ + val factions = financialActionLoader.loadReadyToProcess(offset, size) factions.forEach { try { + if (it.parent != null) { + val reloadParent = financialActionLoader.loadFinancialAction(it.parent.id)!! + if (reloadParent.status != FinancialActionStatus.PROCESSED) { + logger.warn("financial job {} skipped because of parent status {}", it, reloadParent) + return@forEach + } + } walletProxy.transfer( it.symbol, it.senderWalletType, @@ -32,10 +37,11 @@ class FinancialActionJobManagerImpl( it.eventType + it.pointer, null ) - financialActionPersister.updateStatus(it, FinancialActionStatus.PROCESSED) + financialActionPersister.updateStatusNewTx(it, FinancialActionStatus.PROCESSED) + } catch (e: Exception) { logger.error("financial job error", e) - financialActionPersister.updateStatus(it, FinancialActionStatus.ERROR) + financialActionPersister.updateStatusNewTx(it, FinancialActionStatus.ERROR) } } } diff --git a/accountant/accountant-core/src/main/kotlin/co/nilin/opex/accountant/core/spi/FinancialActionLoader.kt b/accountant/accountant-core/src/main/kotlin/co/nilin/opex/accountant/core/spi/FinancialActionLoader.kt index 72fe8cd77..398ab30c9 100644 --- a/accountant/accountant-core/src/main/kotlin/co/nilin/opex/accountant/core/spi/FinancialActionLoader.kt +++ b/accountant/accountant-core/src/main/kotlin/co/nilin/opex/accountant/core/spi/FinancialActionLoader.kt @@ -6,4 +6,6 @@ interface FinancialActionLoader { suspend fun findLast(userUuid: String, ouid: String): FinancialAction? suspend fun loadUnprocessed(offset: Long, size: Long): List suspend fun countUnprocessed(userUuid: String, symbol: String, eventType: String): Long + suspend fun loadReadyToProcess(offset: Long, size: Long): List + suspend fun loadFinancialAction(id: Long?): FinancialAction? } \ No newline at end of file diff --git a/accountant/accountant-core/src/main/kotlin/co/nilin/opex/accountant/core/spi/FinancialActionPersister.kt b/accountant/accountant-core/src/main/kotlin/co/nilin/opex/accountant/core/spi/FinancialActionPersister.kt index 8da7fbfef..a97924103 100644 --- a/accountant/accountant-core/src/main/kotlin/co/nilin/opex/accountant/core/spi/FinancialActionPersister.kt +++ b/accountant/accountant-core/src/main/kotlin/co/nilin/opex/accountant/core/spi/FinancialActionPersister.kt @@ -14,4 +14,5 @@ interface FinancialActionPersister { suspend fun updateStatus(faUuid: String, status: FinancialActionStatus) suspend fun updateBatchStatus(financialAction: List, status: FinancialActionStatus) + suspend fun updateStatusNewTx(financialAction: FinancialAction, status: FinancialActionStatus) } \ No newline at end of file diff --git a/accountant/accountant-ports/accountant-persister-postgres/src/main/kotlin/co/nilin/opex/accountant/ports/postgres/dao/FinancialActionRepository.kt b/accountant/accountant-ports/accountant-persister-postgres/src/main/kotlin/co/nilin/opex/accountant/ports/postgres/dao/FinancialActionRepository.kt index e682148f9..b99fd1d58 100644 --- a/accountant/accountant-ports/accountant-persister-postgres/src/main/kotlin/co/nilin/opex/accountant/ports/postgres/dao/FinancialActionRepository.kt +++ b/accountant/accountant-ports/accountant-persister-postgres/src/main/kotlin/co/nilin/opex/accountant/ports/postgres/dao/FinancialActionRepository.kt @@ -21,16 +21,16 @@ interface FinancialActionRepository : ReactiveCrudRepository - @Query("select count(1) from fi_actions fi where fi.sender = :uuid and fi.symbol = :symbol and fi.event_type = :eventType and fi.status = :status") - fun findByUuidAndSymbolAndEventTypeAndStatus( + @Query("select count(1) from fi_actions fi where fi.sender = :uuid and fi.symbol = :symbol and fi.event_type = :eventType and fi.status != :status") + fun countByUuidAndSymbolAndEventTypeAndStatusNot( @Param("uuid") uuid: String, @Param("symbol") symbol: String, @Param("eventType") eventType: String, @Param("status") financialActionStatus: FinancialActionStatus ): Mono - @Query("select * from fi_actions fi where status = :status") - fun findByStatus(@Param("status") status: String, paging: Pageable): Flow + @Query("select * from fi_actions fi where status != :status") + fun findByStatusNot(@Param("status") status: String, paging: Pageable): Flow @Query("update fi_actions set status = :status where id = :id") fun updateStatus(@Param("id") id: Long, @Param("status") status: FinancialActionStatus): Mono @@ -40,4 +40,9 @@ interface FinancialActionRepository : ReactiveCrudRepository, status: FinancialActionStatus): Mono + @Query("select * from fi_actions fi where status = 'CREATED' " + + "and ( parent_id is null " + + " or 'ERROR' != (select pfi.status from fi_actions pfi where pfi.id = fi.parent_id)" + + ")") + fun findReadyToProcess(of: Pageable): Flow } \ No newline at end of file diff --git a/accountant/accountant-ports/accountant-persister-postgres/src/main/kotlin/co/nilin/opex/accountant/ports/postgres/impl/FinancialActionLoaderImpl.kt b/accountant/accountant-ports/accountant-persister-postgres/src/main/kotlin/co/nilin/opex/accountant/ports/postgres/impl/FinancialActionLoaderImpl.kt index 34d5c35aa..29a953423 100644 --- a/accountant/accountant-ports/accountant-persister-postgres/src/main/kotlin/co/nilin/opex/accountant/ports/postgres/impl/FinancialActionLoaderImpl.kt +++ b/accountant/accountant-ports/accountant-persister-postgres/src/main/kotlin/co/nilin/opex/accountant/ports/postgres/impl/FinancialActionLoaderImpl.kt @@ -18,8 +18,15 @@ import java.math.BigDecimal class FinancialActionLoaderImpl(val financialActionRepository: FinancialActionRepository) : FinancialActionLoader { override suspend fun loadUnprocessed(offset: Long, size: Long): List { - return financialActionRepository.findByStatus( - FinancialActionStatus.CREATED.name, + return financialActionRepository.findByStatusNot( + FinancialActionStatus.PROCESSED.name, + PageRequest.of(offset.toInt(), size.toInt(), Sort.by(Sort.Direction.ASC, "createDate")) + ).map { loadFinancialAction(it.id)!! } + .toList() + } + + override suspend fun loadReadyToProcess(offset: Long, size: Long): List { + return financialActionRepository.findReadyToProcess( PageRequest.of(offset.toInt(), size.toInt(), Sort.by(Sort.Direction.ASC, "createDate")) ).map { loadFinancialAction(it.id)!! } .toList() @@ -33,15 +40,15 @@ class FinancialActionLoaderImpl(val financialActionRepository: FinancialActionRe } override suspend fun countUnprocessed(userUuid: String, symbol: String, eventType: String): Long { - return financialActionRepository.findByUuidAndSymbolAndEventTypeAndStatus( + return financialActionRepository.countByUuidAndSymbolAndEventTypeAndStatusNot( userUuid, symbol, eventType, - FinancialActionStatus.CREATED + FinancialActionStatus.PROCESSED ).awaitFirstOrElse { BigDecimal.ZERO }.toLong() } - private suspend fun loadFinancialAction(id: Long?): FinancialAction? { + override suspend fun loadFinancialAction(id: Long?): FinancialAction? { if (id != null) { val fim = financialActionRepository.findById(id).awaitFirst() return FinancialAction( diff --git a/accountant/accountant-ports/accountant-persister-postgres/src/main/kotlin/co/nilin/opex/accountant/ports/postgres/impl/FinancialActionPersisterImpl.kt b/accountant/accountant-ports/accountant-persister-postgres/src/main/kotlin/co/nilin/opex/accountant/ports/postgres/impl/FinancialActionPersisterImpl.kt index b9fd7e1aa..e5f852330 100644 --- a/accountant/accountant-ports/accountant-persister-postgres/src/main/kotlin/co/nilin/opex/accountant/ports/postgres/impl/FinancialActionPersisterImpl.kt +++ b/accountant/accountant-ports/accountant-persister-postgres/src/main/kotlin/co/nilin/opex/accountant/ports/postgres/impl/FinancialActionPersisterImpl.kt @@ -9,6 +9,8 @@ import kotlinx.coroutines.reactive.awaitFirstOrNull import kotlinx.coroutines.reactor.awaitSingle import kotlinx.coroutines.reactor.awaitSingleOrNull import org.springframework.stereotype.Component +import org.springframework.transaction.annotation.Propagation +import org.springframework.transaction.annotation.Transactional @Component class FinancialActionPersisterImpl(private val financialActionRepository: FinancialActionRepository) : @@ -60,9 +62,14 @@ class FinancialActionPersisterImpl(private val financialActionRepository: Financ ).awaitSingle() } + override suspend fun updateStatus(financialAction: FinancialAction, status: FinancialActionStatus) { financialActionRepository.updateStatus(financialAction.id!!, status).awaitSingleOrNull() } + @Transactional(propagation = Propagation.REQUIRES_NEW) + override suspend fun updateStatusNewTx(financialAction: FinancialAction, status: FinancialActionStatus) { + financialActionRepository.updateStatus(financialAction.id!!, status).awaitSingleOrNull() + } override suspend fun updateStatus(faUuid: String, status: FinancialActionStatus) { financialActionRepository.updateStatus(faUuid, status).awaitSingleOrNull() diff --git a/wallet/wallet-app/src/test/resources/application.yml b/wallet/wallet-app/src/test/resources/application.yml index 8246bdf42..5280c19d1 100644 --- a/wallet/wallet-app/src/test/resources/application.yml +++ b/wallet/wallet-app/src/test/resources/application.yml @@ -30,7 +30,7 @@ spring: auto-offset-reset: earliest group-id: wallet r2dbc: - url: r2dbc:tc:postgresql:///databasename?TC_IMAGE_TAG=9.6.8 + url: r2dbc:tc:postgresql:///wallet?TC_IMAGE_TAG=9.6.8 initialization-mode: always cloud: bootstrap: From 22bceabf8c92886f7b37987edb0ffa75d652e21b Mon Sep 17 00:00:00 2001 From: Peyman Date: Tue, 6 Jun 2023 15:44:54 +0330 Subject: [PATCH 3/8] Update preferences --- .../main/kotlin/co/nilin/opex/utility/preferences/Chain.kt | 2 +- .../co/nilin/opex/utility/preferences/ChainSyncSchedule.kt | 3 ++- .../co/nilin/opex/utility/preferences/Preferences.kt | 2 +- .../kotlin/co/nilin/opex/utility/preferences/Scanner.kt | 7 ++++++- 4 files changed, 10 insertions(+), 4 deletions(-) diff --git a/utility/preferences/src/main/kotlin/co/nilin/opex/utility/preferences/Chain.kt b/utility/preferences/src/main/kotlin/co/nilin/opex/utility/preferences/Chain.kt index 45258a743..4482fdb0d 100644 --- a/utility/preferences/src/main/kotlin/co/nilin/opex/utility/preferences/Chain.kt +++ b/utility/preferences/src/main/kotlin/co/nilin/opex/utility/preferences/Chain.kt @@ -4,5 +4,5 @@ data class Chain( var name: String = "", var addressType: String = "", val scanners: List = emptyList(), - var schedule: ChainSyncSchedule? = null + var schedules: List = emptyList() ) diff --git a/utility/preferences/src/main/kotlin/co/nilin/opex/utility/preferences/ChainSyncSchedule.kt b/utility/preferences/src/main/kotlin/co/nilin/opex/utility/preferences/ChainSyncSchedule.kt index a61564571..0fff4a388 100644 --- a/utility/preferences/src/main/kotlin/co/nilin/opex/utility/preferences/ChainSyncSchedule.kt +++ b/utility/preferences/src/main/kotlin/co/nilin/opex/utility/preferences/ChainSyncSchedule.kt @@ -1,10 +1,11 @@ package co.nilin.opex.utility.preferences data class ChainSyncSchedule( + var workerType: String = "MAIN", var delay: Long = 600, - var errorDelay: Long = 60, var timeout: Int = 30, var maxRetries: Int = 5, var confirmations: Int = 0, + var maxBlockCount: Int = 10, var enabled: Boolean = true ) diff --git a/utility/preferences/src/main/kotlin/co/nilin/opex/utility/preferences/Preferences.kt b/utility/preferences/src/main/kotlin/co/nilin/opex/utility/preferences/Preferences.kt index 4da1bf3f7..1ede6e700 100644 --- a/utility/preferences/src/main/kotlin/co/nilin/opex/utility/preferences/Preferences.kt +++ b/utility/preferences/src/main/kotlin/co/nilin/opex/utility/preferences/Preferences.kt @@ -5,7 +5,7 @@ data class Preferences( var chains: List = emptyList(), var currencies: List = emptyList(), var markets: List = emptyList(), - var userLimits: List = emptyList(), + var userLimits: List = emptyList(), var userLevels: List = emptyList(), var system: System = System(), val auth: Auth = Auth() diff --git a/utility/preferences/src/main/kotlin/co/nilin/opex/utility/preferences/Scanner.kt b/utility/preferences/src/main/kotlin/co/nilin/opex/utility/preferences/Scanner.kt index 1f0bf4355..25414a50b 100644 --- a/utility/preferences/src/main/kotlin/co/nilin/opex/utility/preferences/Scanner.kt +++ b/utility/preferences/src/main/kotlin/co/nilin/opex/utility/preferences/Scanner.kt @@ -1,3 +1,8 @@ package co.nilin.opex.utility.preferences -data class Scanner(var url: String = "", var maxBlockRange: Int = 30, var delayOnRateLimit: Int = 300) +data class Scanner( + var url: String = "", + var maxBlockRange: Int = 30, + var delayOnRateLimit: Int = 300, + var maxParallelCall: Int = 3 +) From d39e0b323b888cd0118bf13d53226ba4997b028a Mon Sep 17 00:00:00 2001 From: Peyman Date: Sun, 18 Jun 2023 15:11:32 +0330 Subject: [PATCH 4/8] Add sync service without chain in path variable --- .../bcgateway/app/controller/WalletSyncController.kt | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/bc-gateway/bc-gateway-app/src/main/kotlin/co/nilin/opex/bcgateway/app/controller/WalletSyncController.kt b/bc-gateway/bc-gateway-app/src/main/kotlin/co/nilin/opex/bcgateway/app/controller/WalletSyncController.kt index 710e25b6e..c8f1fe3ac 100644 --- a/bc-gateway/bc-gateway-app/src/main/kotlin/co/nilin/opex/bcgateway/app/controller/WalletSyncController.kt +++ b/bc-gateway/bc-gateway-app/src/main/kotlin/co/nilin/opex/bcgateway/app/controller/WalletSyncController.kt @@ -14,11 +14,12 @@ import org.springframework.web.bind.annotation.RestController @RestController class WalletSyncController(private val chainHandler: ChainHandler, private val walletSyncService: WalletSyncService) { + private val logger: Logger by LoggerDelegate() @PutMapping("wallet-sync/{chain}") suspend fun syncTransferOnChain(@PathVariable chain: String, @RequestBody transfers: List) { - logger.debug("Received ${transfers.size} transfer(s) for chain: $chain") + logger.info("Received ${transfers.size} transfer(s) for chain: $chain") runCatching { chainHandler.fetchChainInfo(chain) }.onFailure { @@ -27,4 +28,10 @@ class WalletSyncController(private val chainHandler: ChainHandler, private val w walletSyncService.syncTransfers(transfers) } } + + @PutMapping("wallet-sync") + suspend fun syncTransfers(@RequestBody transfers: List) { + logger.info("Received ${transfers.size} transfer(s)") + walletSyncService.syncTransfers(transfers) + } } From 771d162e818146070aba9264d99d0aea62bb00cb Mon Sep 17 00:00:00 2001 From: Peyman Date: Sun, 18 Jun 2023 16:11:46 +0330 Subject: [PATCH 5/8] Update vault config --- docker-images/vault/workflow-vault.sh | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/docker-images/vault/workflow-vault.sh b/docker-images/vault/workflow-vault.sh index 10f04d7bf..172e5b1d2 100755 --- a/docker-images/vault/workflow-vault.sh +++ b/docker-images/vault/workflow-vault.sh @@ -58,11 +58,12 @@ init_secrets() { vault write auth/app-id/map/app-id/ethereum-scanner value=backend-policy display_name=ethereum-scanner vault write auth/app-id/map/app-id/tron-scanner value=backend-policy display_name=tron-scanner vault write auth/app-id/map/app-id/scanner-scheduler value=backend-policy display_name=scanner-scheduler + vault write auth/app-id/map/app-id/scanner-liaison value=backend-policy display_name=scanner-liaison vault write auth/app-id/map/app-id/opex-referral value=backend-policy display_name=opex-referral ## Enable user-id vault write auth/app-id/map/user-id/${BACKEND_USER} \ - value=opex-wallet,opex-websocket,opex-eventlog,opex-auth,opex-accountant,opex-api,opex-market,opex-bc-gateway,opex-payment,opex-admin,bitcoin-scanner,ethereum-scanner,tron-scanner,scanner-scheduler,opex-referral + value=opex-wallet,opex-websocket,opex-eventlog,opex-auth,opex-accountant,opex-api,opex-market,opex-bc-gateway,opex-payment,opex-admin,bitcoin-scanner,ethereum-scanner,tron-scanner,scanner-scheduler,scanner-liaison,opex-referral ## Check login app-id vault write auth/app-id/login/opex-accountant user_id=${BACKEND_USER} @@ -79,6 +80,7 @@ init_secrets() { vault write auth/app-id/login/ethereum-scanner user_id=${BACKEND_USER} vault write auth/app-id/login/tron-scanner user_id=${BACKEND_USER} vault write auth/app-id/login/scanner-scheduler user_id=${BACKEND_USER} + vault write auth/app-id/login/scanner-liaison user_id=${BACKEND_USER} vault write auth/app-id/login/opex-referral user_id=${BACKEND_USER} ## Add secret values @@ -97,6 +99,7 @@ init_secrets() { vault kv put secret/ethereum-scanner dbusername=${DB_USER} dbpassword=${DB_PASS} vault kv put secret/tron-scanner dbusername=${DB_USER} dbpassword=${DB_PASS} vault kv put secret/scanner-scheduler dbusername=${DB_USER} dbpassword=${DB_PASS} + vault kv put secret/scanner-liaison dbusername=${DB_USER} dbpassword=${DB_PASS} vault kv put secret/opex-referral dbusername=${DB_USER} dbpassword=${DB_PASS} db_read_only_username=${DB_READ_ONLY_USER} db_read_only_pass=${DB_READ_ONLY_PASS} } From 7a3166a708fa94cd4588aa144b760f197d1af016 Mon Sep 17 00:00:00 2001 From: Peyman Date: Wed, 21 Jun 2023 11:34:10 +0330 Subject: [PATCH 6/8] Cancel order event processing fixed (#360) * Fix order request event * Close #359: Fixed cancel order request event processing * Fix test failures * Revert CancelOrderEvent class type on cancel order --- .../accountant/app/listener/OrderListener.kt | 38 ++++---- .../src/main/resources/application.yml | 4 +- .../core/service/OrderManagerImpl.kt | 4 + .../core/service/OrderManagerImplTest.kt | 19 +++- .../listener/config/AccountantKafkaConfig.kt | 2 +- .../listener/consumer/OrderKafkaListener.kt | 4 +- .../listener/inout/OrderCancelRequestEvent.kt | 10 +++ .../kafka/listener/inout/OrderRequestEvent.kt | 5 ++ ...tRequest.kt => OrderSubmitRequestEvent.kt} | 18 ++-- .../spi/OrderSubmitRequestListener.kt | 4 +- .../listener/MatchingEngineEventListener.kt | 33 ------- .../engine/app/listener/OrderListener.kt | 49 +++++++---- .../core/inout/OrderCancelRequestEvent.kt | 10 +++ .../engine/core/inout/OrderRequestEvent.kt | 5 ++ .../engine/core/inout/OrderSubmitRequest.kt | 19 ---- .../core/inout/OrderSubmitRequestEvent.kt | 19 ++++ .../kafka/listener/config/OrderKafkaConfig.kt | 11 +-- .../listener/consumer/OrderKafkaListener.kt | 16 ++-- .../listener/spi/OrderRequestEventListener.kt | 8 ++ .../spi/OrderSubmitRequestListener.kt | 8 -- .../submitter/config/EventsKafkaConfig.kt | 7 +- .../gateway/app/service/OrderService.kt | 18 ++-- .../gateway/app/service/OrderServiceTest.kt | 24 ++---- .../submitter/config/OrderKafkaConfig.kt | 10 ++- .../inout/OrderCancelRequestEvent.kt | 10 +++ .../submitter/inout/OrderRequestEvent.kt | 5 ++ ...tRequest.kt => OrderSubmitRequestEvent.kt} | 9 +- ...itter.kt => OrderRequestEventSubmitter.kt} | 10 +-- preferences-dev.yml | 86 ++++++++++++++----- 29 files changed, 276 insertions(+), 189 deletions(-) create mode 100644 accountant/accountant-ports/accountant-eventlistener-kafka/src/main/kotlin/co/nilin/opex/accountant/ports/kafka/listener/inout/OrderCancelRequestEvent.kt create mode 100644 accountant/accountant-ports/accountant-eventlistener-kafka/src/main/kotlin/co/nilin/opex/accountant/ports/kafka/listener/inout/OrderRequestEvent.kt rename accountant/accountant-ports/accountant-eventlistener-kafka/src/main/kotlin/co/nilin/opex/accountant/ports/kafka/listener/inout/{OrderSubmitRequest.kt => OrderSubmitRequestEvent.kt} (64%) create mode 100644 matching-engine/matching-engine-core/src/main/kotlin/co/nilin/opex/matching/engine/core/inout/OrderCancelRequestEvent.kt create mode 100644 matching-engine/matching-engine-core/src/main/kotlin/co/nilin/opex/matching/engine/core/inout/OrderRequestEvent.kt delete mode 100644 matching-engine/matching-engine-core/src/main/kotlin/co/nilin/opex/matching/engine/core/inout/OrderSubmitRequest.kt create mode 100644 matching-engine/matching-engine-core/src/main/kotlin/co/nilin/opex/matching/engine/core/inout/OrderSubmitRequestEvent.kt create mode 100644 matching-engine/matching-engine-ports/matching-engine-eventlistener-kafka/src/main/kotlin/co/nilin/opex/matching/engine/ports/kafka/listener/spi/OrderRequestEventListener.kt delete mode 100644 matching-engine/matching-engine-ports/matching-engine-eventlistener-kafka/src/main/kotlin/co/nilin/opex/matching/engine/ports/kafka/listener/spi/OrderSubmitRequestListener.kt create mode 100644 matching-gateway/matching-gateway-port/matching-gateway-submitter-kafka/src/main/kotlin/co/nilin/opex/matching/gateway/ports/kafka/submitter/inout/OrderCancelRequestEvent.kt create mode 100644 matching-gateway/matching-gateway-port/matching-gateway-submitter-kafka/src/main/kotlin/co/nilin/opex/matching/gateway/ports/kafka/submitter/inout/OrderRequestEvent.kt rename matching-gateway/matching-gateway-port/matching-gateway-submitter-kafka/src/main/kotlin/co/nilin/opex/matching/gateway/ports/kafka/submitter/inout/{OrderSubmitRequest.kt => OrderSubmitRequestEvent.kt} (80%) rename matching-gateway/matching-gateway-port/matching-gateway-submitter-kafka/src/main/kotlin/co/nilin/opex/matching/gateway/ports/kafka/submitter/service/{OrderSubmitter.kt => OrderRequestEventSubmitter.kt} (69%) diff --git a/accountant/accountant-app/src/main/kotlin/co/nilin/opex/accountant/app/listener/OrderListener.kt b/accountant/accountant-app/src/main/kotlin/co/nilin/opex/accountant/app/listener/OrderListener.kt index 52f95ef56..3ebd44422 100644 --- a/accountant/accountant-app/src/main/kotlin/co/nilin/opex/accountant/app/listener/OrderListener.kt +++ b/accountant/accountant-app/src/main/kotlin/co/nilin/opex/accountant/app/listener/OrderListener.kt @@ -1,7 +1,8 @@ package co.nilin.opex.accountant.app.listener import co.nilin.opex.accountant.core.api.OrderManager -import co.nilin.opex.accountant.ports.kafka.listener.inout.OrderSubmitRequest +import co.nilin.opex.accountant.ports.kafka.listener.inout.OrderRequestEvent +import co.nilin.opex.accountant.ports.kafka.listener.inout.OrderSubmitRequestEvent import co.nilin.opex.accountant.ports.kafka.listener.spi.OrderSubmitRequestListener import co.nilin.opex.matching.engine.core.eventh.events.SubmitOrderEvent import kotlinx.coroutines.runBlocking @@ -15,25 +16,26 @@ class OrderListener(private val orderManager: OrderManager) : OrderSubmitRequest return "OrderListener" } - override fun onEvent(event: OrderSubmitRequest, partition: Int, offset: Long, timestamp: Long) { + override fun onEvent(event: OrderRequestEvent, partition: Int, offset: Long, timestamp: Long) { runBlocking { - logger.info("Order submit event received ${event.ouid}") - - orderManager.handleRequestOrder( - SubmitOrderEvent( - event.ouid, - event.uuid, - event.orderId, - event.pair, - event.price, - event.quantity, - event.quantity, - event.direction, - event.matchConstraint, - event.orderType, - event.userLevel + if (event is OrderSubmitRequestEvent) { + logger.info("Order submit event received ${event.ouid}") + orderManager.handleRequestOrder( + SubmitOrderEvent( + event.ouid, + event.uuid, + event.orderId, + event.pair, + event.price, + event.quantity, + event.quantity, + event.direction, + event.matchConstraint, + event.orderType, + event.userLevel + ) ) - ) + } } } } \ No newline at end of file diff --git a/accountant/accountant-app/src/main/resources/application.yml b/accountant/accountant-app/src/main/resources/application.yml index 0bf62e5a8..a41c93880 100644 --- a/accountant/accountant-app/src/main/resources/application.yml +++ b/accountant/accountant-app/src/main/resources/application.yml @@ -1,8 +1,8 @@ server.port: 8080 logging: level: - co.nilin: DEBUG - reactor.netty.http.client: DEBUG + co.nilin: INFO + reactor.netty.http.client: INFO spring: application: name: opex-accountant diff --git a/accountant/accountant-core/src/main/kotlin/co/nilin/opex/accountant/core/service/OrderManagerImpl.kt b/accountant/accountant-core/src/main/kotlin/co/nilin/opex/accountant/core/service/OrderManagerImpl.kt index afc94a6ab..34456db1d 100644 --- a/accountant/accountant-core/src/main/kotlin/co/nilin/opex/accountant/core/service/OrderManagerImpl.kt +++ b/accountant/accountant-core/src/main/kotlin/co/nilin/opex/accountant/core/service/OrderManagerImpl.kt @@ -9,6 +9,7 @@ import co.nilin.opex.accountant.core.model.FinancialActionStatus import co.nilin.opex.accountant.core.model.Order import co.nilin.opex.accountant.core.spi.* import co.nilin.opex.matching.engine.core.eventh.events.* +import co.nilin.opex.matching.engine.core.inout.RequestedOperation import co.nilin.opex.matching.engine.core.model.OrderDirection import org.springframework.transaction.annotation.Transactional import java.math.BigDecimal @@ -122,6 +123,9 @@ open class OrderManagerImpl( @Transactional override suspend fun handleRejectOrder(rejectOrderEvent: RejectOrderEvent): List { + if (rejectOrderEvent.requestedOperation != RequestedOperation.PLACE_ORDER) + return emptyList() + //order by ouid val order = orderPersister.load(rejectOrderEvent.ouid) if (order == null) { diff --git a/accountant/accountant-core/src/test/kotlin/co/nilin/opex/accountant/core/service/OrderManagerImplTest.kt b/accountant/accountant-core/src/test/kotlin/co/nilin/opex/accountant/core/service/OrderManagerImplTest.kt index 9406ff297..6b50ff506 100644 --- a/accountant/accountant-core/src/test/kotlin/co/nilin/opex/accountant/core/service/OrderManagerImplTest.kt +++ b/accountant/accountant-core/src/test/kotlin/co/nilin/opex/accountant/core/service/OrderManagerImplTest.kt @@ -222,7 +222,7 @@ internal class OrderManagerImplTest { "user_1", 56, Pair("BTC", "USDT"), - RequestedOperation.CANCEL_ORDER, + RequestedOperation.PLACE_ORDER, RejectReason.ORDER_NOT_FOUND ) @@ -234,6 +234,21 @@ internal class OrderManagerImplTest { coVerify(exactly = 1) { tempEventPersister.saveTempEvent(any(), any()) } } + @Test + fun givenRejectOrderReceived_whenOperationNotPlaceOrder_returnEmptyFA(): Unit = runBlocking { + val orderEvent = RejectOrderEvent( + "ouid", + "user_1", + 56, + Pair("BTC", "USDT"), + RequestedOperation.CANCEL_ORDER, + RejectReason.ORDER_NOT_FOUND + ) + + val fa = orderManager.handleRejectOrder(orderEvent) + assertThat(fa.size).isEqualTo(0) + } + @Test fun givenRejectOrderReceived_whenLocalFound_publishRichOrderUpdate(): Unit = runBlocking { val orderEvent = RejectOrderEvent( @@ -246,7 +261,7 @@ internal class OrderManagerImplTest { OrderDirection.BID, MatchConstraint.GTC, OrderType.LIMIT_ORDER, - RequestedOperation.CANCEL_ORDER, + RequestedOperation.PLACE_ORDER, RejectReason.ORDER_NOT_FOUND, ) coEvery { orderPersister.load(any()) } returns Valid.order diff --git a/accountant/accountant-ports/accountant-eventlistener-kafka/src/main/kotlin/co/nilin/opex/accountant/ports/kafka/listener/config/AccountantKafkaConfig.kt b/accountant/accountant-ports/accountant-eventlistener-kafka/src/main/kotlin/co/nilin/opex/accountant/ports/kafka/listener/config/AccountantKafkaConfig.kt index 096fac5d9..f6c45d6fa 100644 --- a/accountant/accountant-ports/accountant-eventlistener-kafka/src/main/kotlin/co/nilin/opex/accountant/ports/kafka/listener/config/AccountantKafkaConfig.kt +++ b/accountant/accountant-ports/accountant-eventlistener-kafka/src/main/kotlin/co/nilin/opex/accountant/ports/kafka/listener/config/AccountantKafkaConfig.kt @@ -39,7 +39,7 @@ class AccountantKafkaConfig { ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG to StringDeserializer::class.java, ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG to JsonDeserializer::class.java, JsonDeserializer.TRUSTED_PACKAGES to "co.nilin.opex.*", - JsonDeserializer.TYPE_MAPPINGS to "order_request:co.nilin.opex.accountant.ports.kafka.listener.inout.OrderSubmitRequest" + JsonDeserializer.TYPE_MAPPINGS to "order_request_event:co.nilin.opex.accountant.ports.kafka.listener.inout.OrderRequestEvent,order_request_submit:co.nilin.opex.accountant.ports.kafka.listener.inout.OrderSubmitRequestEvent,order_request_cancel:co.nilin.opex.accountant.ports.kafka.listener.inout.OrderCancelRequestEvent" ) } diff --git a/accountant/accountant-ports/accountant-eventlistener-kafka/src/main/kotlin/co/nilin/opex/accountant/ports/kafka/listener/consumer/OrderKafkaListener.kt b/accountant/accountant-ports/accountant-eventlistener-kafka/src/main/kotlin/co/nilin/opex/accountant/ports/kafka/listener/consumer/OrderKafkaListener.kt index 08b3d7df5..b01ae2c9a 100644 --- a/accountant/accountant-ports/accountant-eventlistener-kafka/src/main/kotlin/co/nilin/opex/accountant/ports/kafka/listener/consumer/OrderKafkaListener.kt +++ b/accountant/accountant-ports/accountant-eventlistener-kafka/src/main/kotlin/co/nilin/opex/accountant/ports/kafka/listener/consumer/OrderKafkaListener.kt @@ -1,8 +1,8 @@ package co.nilin.opex.accountant.ports.kafka.listener.consumer -import co.nilin.opex.accountant.ports.kafka.listener.inout.OrderSubmitRequest +import co.nilin.opex.accountant.ports.kafka.listener.inout.OrderRequestEvent import co.nilin.opex.accountant.ports.kafka.listener.spi.OrderSubmitRequestListener import org.springframework.stereotype.Component @Component -class OrderKafkaListener : EventConsumer() \ No newline at end of file +class OrderKafkaListener : EventConsumer() \ No newline at end of file diff --git a/accountant/accountant-ports/accountant-eventlistener-kafka/src/main/kotlin/co/nilin/opex/accountant/ports/kafka/listener/inout/OrderCancelRequestEvent.kt b/accountant/accountant-ports/accountant-eventlistener-kafka/src/main/kotlin/co/nilin/opex/accountant/ports/kafka/listener/inout/OrderCancelRequestEvent.kt new file mode 100644 index 000000000..c10c325ac --- /dev/null +++ b/accountant/accountant-ports/accountant-eventlistener-kafka/src/main/kotlin/co/nilin/opex/accountant/ports/kafka/listener/inout/OrderCancelRequestEvent.kt @@ -0,0 +1,10 @@ +package co.nilin.opex.accountant.ports.kafka.listener.inout + +import co.nilin.opex.matching.engine.core.model.Pair + +class OrderCancelRequestEvent( + ouid: String, + uuid: String, + pair: Pair, + val orderId: Long +) : OrderRequestEvent(ouid, uuid, pair) \ No newline at end of file diff --git a/accountant/accountant-ports/accountant-eventlistener-kafka/src/main/kotlin/co/nilin/opex/accountant/ports/kafka/listener/inout/OrderRequestEvent.kt b/accountant/accountant-ports/accountant-eventlistener-kafka/src/main/kotlin/co/nilin/opex/accountant/ports/kafka/listener/inout/OrderRequestEvent.kt new file mode 100644 index 000000000..04876ee6f --- /dev/null +++ b/accountant/accountant-ports/accountant-eventlistener-kafka/src/main/kotlin/co/nilin/opex/accountant/ports/kafka/listener/inout/OrderRequestEvent.kt @@ -0,0 +1,5 @@ +package co.nilin.opex.accountant.ports.kafka.listener.inout + +import co.nilin.opex.matching.engine.core.model.Pair + +abstract class OrderRequestEvent(val ouid:String, val uuid: String, val pair: Pair) \ No newline at end of file diff --git a/accountant/accountant-ports/accountant-eventlistener-kafka/src/main/kotlin/co/nilin/opex/accountant/ports/kafka/listener/inout/OrderSubmitRequest.kt b/accountant/accountant-ports/accountant-eventlistener-kafka/src/main/kotlin/co/nilin/opex/accountant/ports/kafka/listener/inout/OrderSubmitRequestEvent.kt similarity index 64% rename from accountant/accountant-ports/accountant-eventlistener-kafka/src/main/kotlin/co/nilin/opex/accountant/ports/kafka/listener/inout/OrderSubmitRequest.kt rename to accountant/accountant-ports/accountant-eventlistener-kafka/src/main/kotlin/co/nilin/opex/accountant/ports/kafka/listener/inout/OrderSubmitRequestEvent.kt index 6790172ae..71e7275da 100644 --- a/accountant/accountant-ports/accountant-eventlistener-kafka/src/main/kotlin/co/nilin/opex/accountant/ports/kafka/listener/inout/OrderSubmitRequest.kt +++ b/accountant/accountant-ports/accountant-eventlistener-kafka/src/main/kotlin/co/nilin/opex/accountant/ports/kafka/listener/inout/OrderSubmitRequestEvent.kt @@ -5,15 +5,15 @@ import co.nilin.opex.matching.engine.core.model.OrderDirection import co.nilin.opex.matching.engine.core.model.OrderType import co.nilin.opex.matching.engine.core.model.Pair -data class OrderSubmitRequest( - val ouid: String, - val uuid: String, - val orderId: Long?, - val pair: Pair, - val price: Long = 0, - val quantity: Long = 0, +class OrderSubmitRequestEvent( + ouid: String, + uuid: String, + pair: Pair, + val price: Long, + val quantity: Long, val direction: OrderDirection, val matchConstraint: MatchConstraint, val orderType: OrderType, - val userLevel: String -) \ No newline at end of file + val userLevel: String, + val orderId: Long? = null, +) : OrderRequestEvent(ouid, uuid, pair) \ No newline at end of file diff --git a/accountant/accountant-ports/accountant-eventlistener-kafka/src/main/kotlin/co/nilin/opex/accountant/ports/kafka/listener/spi/OrderSubmitRequestListener.kt b/accountant/accountant-ports/accountant-eventlistener-kafka/src/main/kotlin/co/nilin/opex/accountant/ports/kafka/listener/spi/OrderSubmitRequestListener.kt index ac9a24904..ae224ab1d 100644 --- a/accountant/accountant-ports/accountant-eventlistener-kafka/src/main/kotlin/co/nilin/opex/accountant/ports/kafka/listener/spi/OrderSubmitRequestListener.kt +++ b/accountant/accountant-ports/accountant-eventlistener-kafka/src/main/kotlin/co/nilin/opex/accountant/ports/kafka/listener/spi/OrderSubmitRequestListener.kt @@ -1,5 +1,5 @@ package co.nilin.opex.accountant.ports.kafka.listener.spi -import co.nilin.opex.accountant.ports.kafka.listener.inout.OrderSubmitRequest +import co.nilin.opex.accountant.ports.kafka.listener.inout.OrderRequestEvent -interface OrderSubmitRequestListener : Listener \ No newline at end of file +interface OrderSubmitRequestListener : Listener \ No newline at end of file diff --git a/matching-engine/matching-engine-app/src/main/kotlin/co/nilin/opex/matching/engine/app/listener/MatchingEngineEventListener.kt b/matching-engine/matching-engine-app/src/main/kotlin/co/nilin/opex/matching/engine/app/listener/MatchingEngineEventListener.kt index 0725120d0..682e7560d 100644 --- a/matching-engine/matching-engine-app/src/main/kotlin/co/nilin/opex/matching/engine/app/listener/MatchingEngineEventListener.kt +++ b/matching-engine/matching-engine-app/src/main/kotlin/co/nilin/opex/matching/engine/app/listener/MatchingEngineEventListener.kt @@ -1,13 +1,7 @@ package co.nilin.opex.matching.engine.app.listener -import co.nilin.opex.matching.engine.app.bl.OrderBooks -import co.nilin.opex.matching.engine.core.eventh.events.CancelOrderEvent import co.nilin.opex.matching.engine.core.eventh.events.CoreEvent -import co.nilin.opex.matching.engine.core.eventh.events.EditOrderRequestEvent -import co.nilin.opex.matching.engine.core.inout.OrderCancelCommand -import co.nilin.opex.matching.engine.core.inout.OrderEditCommand import co.nilin.opex.matching.engine.ports.kafka.listener.spi.EventListener -import kotlinx.coroutines.runBlocking import org.slf4j.LoggerFactory class MatchingEngineEventListener : EventListener { @@ -20,32 +14,5 @@ class MatchingEngineEventListener : EventListener { override fun onEvent(event: CoreEvent, partition: Int, offset: Long, timestamp: Long) { logger.info("Received CoreEvent: ${event::class.java}") - - runBlocking { - val orderBook = OrderBooks.lookupOrderBook("${event.pair.leftSideName}_${event.pair.rightSideName}") - - when (event) { - is EditOrderRequestEvent -> orderBook.handleEditCommand( - OrderEditCommand( - event.ouid, - event.uuid, - event.orderId, - event.pair, - event.price, - event.quantity - ) - ) - - is CancelOrderEvent -> orderBook.handleCancelCommand( - OrderCancelCommand( - event.ouid, - event.uuid, - event.orderId, - event.pair - ) - ) - else -> null - } - } } } \ No newline at end of file diff --git a/matching-engine/matching-engine-app/src/main/kotlin/co/nilin/opex/matching/engine/app/listener/OrderListener.kt b/matching-engine/matching-engine-app/src/main/kotlin/co/nilin/opex/matching/engine/app/listener/OrderListener.kt index dc55ef4f2..b19ab8291 100644 --- a/matching-engine/matching-engine-app/src/main/kotlin/co/nilin/opex/matching/engine/app/listener/OrderListener.kt +++ b/matching-engine/matching-engine-app/src/main/kotlin/co/nilin/opex/matching/engine/app/listener/OrderListener.kt @@ -1,32 +1,49 @@ package co.nilin.opex.matching.engine.app.listener import co.nilin.opex.matching.engine.app.bl.OrderBooks -import co.nilin.opex.matching.engine.core.inout.OrderCreateCommand -import co.nilin.opex.matching.engine.core.inout.OrderSubmitRequest -import co.nilin.opex.matching.engine.ports.kafka.listener.spi.OrderSubmitRequestListener +import co.nilin.opex.matching.engine.core.inout.* +import co.nilin.opex.matching.engine.ports.kafka.listener.spi.OrderRequestEventListener +import org.slf4j.LoggerFactory -class OrderListener : OrderSubmitRequestListener { +class OrderListener : OrderRequestEventListener { + + private val logger = LoggerFactory.getLogger(OrderListener::class.java) override fun id(): String { return "OrderListener" } - override suspend fun onOrder(order: OrderSubmitRequest, partition: Int, offset: Long, timestamp: Long) { + override suspend fun onOrder(order: OrderRequestEvent, partition: Int, offset: Long, timestamp: Long) { + logger.info("OrderRequestEvent received. ${order::class.java.simpleName} ouid=${order.ouid}") val orderBook = OrderBooks.lookupOrderBook( order.pair.leftSideName + "_" + order.pair.rightSideName ) - orderBook.handleNewOrderCommand( - OrderCreateCommand( - order.ouid, - order.uuid, - order.pair, - order.price, - order.quantity, - order.direction, - order.matchConstraint, - order.orderType + + when (order) { + is OrderSubmitRequestEvent -> orderBook.handleNewOrderCommand( + OrderCreateCommand( + order.ouid, + order.uuid, + order.pair, + order.price, + order.quantity, + order.direction, + order.matchConstraint, + order.orderType + ) ) - ) + + is OrderCancelRequestEvent -> orderBook.handleCancelCommand( + OrderCancelCommand( + order.ouid, + order.uuid, + order.orderId, + order.pair + ) + ) + + else -> logger.warn("Unknown event type of OrderRequestEvent") + } } } \ No newline at end of file diff --git a/matching-engine/matching-engine-core/src/main/kotlin/co/nilin/opex/matching/engine/core/inout/OrderCancelRequestEvent.kt b/matching-engine/matching-engine-core/src/main/kotlin/co/nilin/opex/matching/engine/core/inout/OrderCancelRequestEvent.kt new file mode 100644 index 000000000..50cd7da61 --- /dev/null +++ b/matching-engine/matching-engine-core/src/main/kotlin/co/nilin/opex/matching/engine/core/inout/OrderCancelRequestEvent.kt @@ -0,0 +1,10 @@ +package co.nilin.opex.matching.engine.core.inout + +import co.nilin.opex.matching.engine.core.model.Pair + +class OrderCancelRequestEvent( + ouid: String, + uuid: String, + pair: Pair, + val orderId: Long +) : OrderRequestEvent(ouid, uuid, pair) \ No newline at end of file diff --git a/matching-engine/matching-engine-core/src/main/kotlin/co/nilin/opex/matching/engine/core/inout/OrderRequestEvent.kt b/matching-engine/matching-engine-core/src/main/kotlin/co/nilin/opex/matching/engine/core/inout/OrderRequestEvent.kt new file mode 100644 index 000000000..3ec793a34 --- /dev/null +++ b/matching-engine/matching-engine-core/src/main/kotlin/co/nilin/opex/matching/engine/core/inout/OrderRequestEvent.kt @@ -0,0 +1,5 @@ +package co.nilin.opex.matching.engine.core.inout + +import co.nilin.opex.matching.engine.core.model.Pair + +abstract class OrderRequestEvent(val ouid:String, val uuid: String, val pair: Pair) \ No newline at end of file diff --git a/matching-engine/matching-engine-core/src/main/kotlin/co/nilin/opex/matching/engine/core/inout/OrderSubmitRequest.kt b/matching-engine/matching-engine-core/src/main/kotlin/co/nilin/opex/matching/engine/core/inout/OrderSubmitRequest.kt deleted file mode 100644 index e8c0e2fb2..000000000 --- a/matching-engine/matching-engine-core/src/main/kotlin/co/nilin/opex/matching/engine/core/inout/OrderSubmitRequest.kt +++ /dev/null @@ -1,19 +0,0 @@ -package co.nilin.opex.matching.engine.core.inout - -import co.nilin.opex.matching.engine.core.model.MatchConstraint -import co.nilin.opex.matching.engine.core.model.OrderDirection -import co.nilin.opex.matching.engine.core.model.OrderType -import co.nilin.opex.matching.engine.core.model.Pair - -class OrderSubmitRequest( - var ouid: String, - var uuid: String, - var pair: Pair, - var orderId: Long? = null, - var price: Long = 0, - var quantity: Long = 0, - var direction: OrderDirection = OrderDirection.BID, - var matchConstraint: MatchConstraint = MatchConstraint.GTC, - var orderType: OrderType = OrderType.LIMIT_ORDER, - var userLevel: String = "" -) \ No newline at end of file diff --git a/matching-engine/matching-engine-core/src/main/kotlin/co/nilin/opex/matching/engine/core/inout/OrderSubmitRequestEvent.kt b/matching-engine/matching-engine-core/src/main/kotlin/co/nilin/opex/matching/engine/core/inout/OrderSubmitRequestEvent.kt new file mode 100644 index 000000000..96d3e03a1 --- /dev/null +++ b/matching-engine/matching-engine-core/src/main/kotlin/co/nilin/opex/matching/engine/core/inout/OrderSubmitRequestEvent.kt @@ -0,0 +1,19 @@ +package co.nilin.opex.matching.engine.core.inout + +import co.nilin.opex.matching.engine.core.model.MatchConstraint +import co.nilin.opex.matching.engine.core.model.OrderDirection +import co.nilin.opex.matching.engine.core.model.OrderType +import co.nilin.opex.matching.engine.core.model.Pair + +class OrderSubmitRequestEvent( + ouid: String, + uuid: String, + pair: Pair, + val price: Long, + val quantity: Long, + val direction: OrderDirection, + val matchConstraint: MatchConstraint, + val orderType: OrderType, + val userLevel: String, + val orderId: Long? = null, +) : OrderRequestEvent(ouid, uuid, pair) \ No newline at end of file diff --git a/matching-engine/matching-engine-ports/matching-engine-eventlistener-kafka/src/main/kotlin/co/nilin/opex/matching/engine/ports/kafka/listener/config/OrderKafkaConfig.kt b/matching-engine/matching-engine-ports/matching-engine-eventlistener-kafka/src/main/kotlin/co/nilin/opex/matching/engine/ports/kafka/listener/config/OrderKafkaConfig.kt index 2817f2c00..6262ba1df 100644 --- a/matching-engine/matching-engine-ports/matching-engine-eventlistener-kafka/src/main/kotlin/co/nilin/opex/matching/engine/ports/kafka/listener/config/OrderKafkaConfig.kt +++ b/matching-engine/matching-engine-ports/matching-engine-eventlistener-kafka/src/main/kotlin/co/nilin/opex/matching/engine/ports/kafka/listener/config/OrderKafkaConfig.kt @@ -1,7 +1,8 @@ package co.nilin.opex.matching.engine.ports.kafka.listener.config import co.nilin.opex.matching.engine.core.eventh.events.CoreEvent -import co.nilin.opex.matching.engine.core.inout.OrderSubmitRequest +import co.nilin.opex.matching.engine.core.inout.OrderRequestEvent +import co.nilin.opex.matching.engine.core.inout.OrderSubmitRequestEvent import co.nilin.opex.matching.engine.ports.kafka.listener.consumer.EventKafkaListener import co.nilin.opex.matching.engine.ports.kafka.listener.consumer.OrderKafkaListener import org.apache.kafka.clients.consumer.ConsumerConfig @@ -40,12 +41,12 @@ class OrderKafkaConfig { ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG to StringDeserializer::class.java, ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG to JsonDeserializer::class.java, JsonDeserializer.TRUSTED_PACKAGES to "co.nilin.opex.*", - JsonDeserializer.TYPE_MAPPINGS to "order_request:co.nilin.opex.matching.engine.core.inout.OrderSubmitRequest" + JsonDeserializer.TYPE_MAPPINGS to "order_request_event:co.nilin.opex.matching.engine.core.inout.OrderRequestEvent,order_request_submit:co.nilin.opex.matching.engine.core.inout.OrderSubmitRequestEvent,order_request_cancel:co.nilin.opex.matching.engine.core.inout.OrderCancelRequestEvent" ) } @Bean("orderConsumerFactory") - fun consumerFactory(@Qualifier("consumerConfigs") consumerConfigs: Map): ConsumerFactory { + fun consumerFactory(@Qualifier("consumerConfigs") consumerConfigs: Map): ConsumerFactory { return DefaultKafkaConsumerFactory(consumerConfigs) } @@ -57,8 +58,8 @@ class OrderKafkaConfig { @Autowired fun configureListener( orderKafkaListener: OrderKafkaListener, - @Qualifier("orderKafkaTemplate") template: KafkaTemplate, - @Qualifier("orderConsumerFactory") consumerFactory: ConsumerFactory + @Qualifier("orderKafkaTemplate") template: KafkaTemplate, + @Qualifier("orderConsumerFactory") consumerFactory: ConsumerFactory ) { val topics = symbols.map { s -> "orders_$s" }.toTypedArray() val containerProps = ContainerProperties(*topics) diff --git a/matching-engine/matching-engine-ports/matching-engine-eventlistener-kafka/src/main/kotlin/co/nilin/opex/matching/engine/ports/kafka/listener/consumer/OrderKafkaListener.kt b/matching-engine/matching-engine-ports/matching-engine-eventlistener-kafka/src/main/kotlin/co/nilin/opex/matching/engine/ports/kafka/listener/consumer/OrderKafkaListener.kt index 69a6327d2..16fd32a75 100644 --- a/matching-engine/matching-engine-ports/matching-engine-eventlistener-kafka/src/main/kotlin/co/nilin/opex/matching/engine/ports/kafka/listener/consumer/OrderKafkaListener.kt +++ b/matching-engine/matching-engine-ports/matching-engine-eventlistener-kafka/src/main/kotlin/co/nilin/opex/matching/engine/ports/kafka/listener/consumer/OrderKafkaListener.kt @@ -1,16 +1,18 @@ package co.nilin.opex.matching.engine.ports.kafka.listener.consumer -import co.nilin.opex.matching.engine.core.inout.OrderSubmitRequest -import co.nilin.opex.matching.engine.ports.kafka.listener.spi.OrderSubmitRequestListener +import co.nilin.opex.matching.engine.core.inout.OrderRequestEvent +import co.nilin.opex.matching.engine.ports.kafka.listener.spi.OrderRequestEventListener import kotlinx.coroutines.runBlocking import org.apache.kafka.clients.consumer.ConsumerRecord import org.springframework.kafka.listener.MessageListener import org.springframework.stereotype.Component @Component -class OrderKafkaListener : MessageListener { - val orderListeners = arrayListOf() - override fun onMessage(data: ConsumerRecord) { +class OrderKafkaListener : MessageListener { + + val orderListeners = arrayListOf() + + override fun onMessage(data: ConsumerRecord) { orderListeners.forEach { tl -> runBlocking { tl.onOrder(data.value(), data.partition(), data.offset(), data.timestamp()) @@ -18,11 +20,11 @@ class OrderKafkaListener : MessageListener { } } - fun addOrderListener(tl: OrderSubmitRequestListener) { + fun addOrderListener(tl: OrderRequestEventListener) { orderListeners.add(tl) } - fun removeOrderListener(tl: OrderSubmitRequestListener) { + fun removeOrderListener(tl: OrderRequestEventListener) { orderListeners.removeIf { item -> item.id() == tl.id() } diff --git a/matching-engine/matching-engine-ports/matching-engine-eventlistener-kafka/src/main/kotlin/co/nilin/opex/matching/engine/ports/kafka/listener/spi/OrderRequestEventListener.kt b/matching-engine/matching-engine-ports/matching-engine-eventlistener-kafka/src/main/kotlin/co/nilin/opex/matching/engine/ports/kafka/listener/spi/OrderRequestEventListener.kt new file mode 100644 index 000000000..32342e348 --- /dev/null +++ b/matching-engine/matching-engine-ports/matching-engine-eventlistener-kafka/src/main/kotlin/co/nilin/opex/matching/engine/ports/kafka/listener/spi/OrderRequestEventListener.kt @@ -0,0 +1,8 @@ +package co.nilin.opex.matching.engine.ports.kafka.listener.spi + +import co.nilin.opex.matching.engine.core.inout.OrderRequestEvent + +interface OrderRequestEventListener { + fun id(): String + suspend fun onOrder(order: OrderRequestEvent, partition: Int, offset: Long, timestamp: Long) +} \ No newline at end of file diff --git a/matching-engine/matching-engine-ports/matching-engine-eventlistener-kafka/src/main/kotlin/co/nilin/opex/matching/engine/ports/kafka/listener/spi/OrderSubmitRequestListener.kt b/matching-engine/matching-engine-ports/matching-engine-eventlistener-kafka/src/main/kotlin/co/nilin/opex/matching/engine/ports/kafka/listener/spi/OrderSubmitRequestListener.kt deleted file mode 100644 index e2268cef6..000000000 --- a/matching-engine/matching-engine-ports/matching-engine-eventlistener-kafka/src/main/kotlin/co/nilin/opex/matching/engine/ports/kafka/listener/spi/OrderSubmitRequestListener.kt +++ /dev/null @@ -1,8 +0,0 @@ -package co.nilin.opex.matching.engine.ports.kafka.listener.spi - -import co.nilin.opex.matching.engine.core.inout.OrderSubmitRequest - -interface OrderSubmitRequestListener { - fun id(): String - suspend fun onOrder(order: OrderSubmitRequest, partition: Int, offset: Long, timestamp: Long) -} \ No newline at end of file diff --git a/matching-engine/matching-engine-ports/matching-engine-submitter-kafka/src/main/kotlin/co/nilin/opex/matching/engine/ports/kafka/submitter/config/EventsKafkaConfig.kt b/matching-engine/matching-engine-ports/matching-engine-submitter-kafka/src/main/kotlin/co/nilin/opex/matching/engine/ports/kafka/submitter/config/EventsKafkaConfig.kt index 53828b3b3..6fdf44e04 100644 --- a/matching-engine/matching-engine-ports/matching-engine-submitter-kafka/src/main/kotlin/co/nilin/opex/matching/engine/ports/kafka/submitter/config/EventsKafkaConfig.kt +++ b/matching-engine/matching-engine-ports/matching-engine-submitter-kafka/src/main/kotlin/co/nilin/opex/matching/engine/ports/kafka/submitter/config/EventsKafkaConfig.kt @@ -1,7 +1,8 @@ package co.nilin.opex.matching.engine.ports.kafka.submitter.config import co.nilin.opex.matching.engine.core.eventh.events.CoreEvent -import co.nilin.opex.matching.engine.core.inout.OrderSubmitRequest +import co.nilin.opex.matching.engine.core.inout.OrderRequestEvent +import co.nilin.opex.matching.engine.core.inout.OrderSubmitRequestEvent import org.apache.kafka.clients.producer.ProducerConfig import org.apache.kafka.common.serialization.StringSerializer import org.springframework.beans.factory.annotation.Qualifier @@ -40,12 +41,12 @@ class EventsKafkaConfig { } @Bean("orderProducerFactory") - fun orderProducerFactory(@Qualifier("producerConfigs") producerConfigs: Map): ProducerFactory { + fun orderProducerFactory(@Qualifier("producerConfigs") producerConfigs: Map): ProducerFactory { return DefaultKafkaProducerFactory(producerConfigs) } @Bean("orderKafkaTemplate") - fun orderKafkaTemplate(@Qualifier("orderProducerFactory") producerFactory: ProducerFactory): KafkaTemplate { + fun orderKafkaTemplate(@Qualifier("orderProducerFactory") producerFactory: ProducerFactory): KafkaTemplate { return KafkaTemplate(producerFactory) } diff --git a/matching-gateway/matching-gateway-app/src/main/kotlin/co/nilin/opex/matching/gateway/app/service/OrderService.kt b/matching-gateway/matching-gateway-app/src/main/kotlin/co/nilin/opex/matching/gateway/app/service/OrderService.kt index 05b2f0fa5..432e778c1 100644 --- a/matching-gateway/matching-gateway-app/src/main/kotlin/co/nilin/opex/matching/gateway/app/service/OrderService.kt +++ b/matching-gateway/matching-gateway-app/src/main/kotlin/co/nilin/opex/matching/gateway/app/service/OrderService.kt @@ -1,17 +1,16 @@ package co.nilin.opex.matching.gateway.app.service -import co.nilin.opex.matching.engine.core.eventh.events.CancelOrderEvent import co.nilin.opex.matching.engine.core.model.OrderDirection import co.nilin.opex.matching.engine.core.model.Pair import co.nilin.opex.matching.gateway.app.inout.CancelOrderRequest import co.nilin.opex.matching.gateway.app.inout.CreateOrderRequest import co.nilin.opex.matching.gateway.app.spi.AccountantApiProxy import co.nilin.opex.matching.gateway.app.spi.PairConfigLoader -import co.nilin.opex.matching.gateway.ports.kafka.submitter.inout.OrderSubmitRequest +import co.nilin.opex.matching.gateway.ports.kafka.submitter.inout.OrderCancelRequestEvent +import co.nilin.opex.matching.gateway.ports.kafka.submitter.inout.OrderSubmitRequestEvent import co.nilin.opex.matching.gateway.ports.kafka.submitter.inout.OrderSubmitResult -import co.nilin.opex.matching.gateway.ports.kafka.submitter.service.EventSubmitter import co.nilin.opex.matching.gateway.ports.kafka.submitter.service.KafkaHealthIndicator -import co.nilin.opex.matching.gateway.ports.kafka.submitter.service.OrderSubmitter +import co.nilin.opex.matching.gateway.ports.kafka.submitter.service.OrderRequestEventSubmitter import co.nilin.opex.utility.error.data.OpexError import co.nilin.opex.utility.error.data.OpexException import org.slf4j.LoggerFactory @@ -21,8 +20,7 @@ import java.math.BigDecimal @Service class OrderService( val accountantApiProxy: AccountantApiProxy, - val orderSubmitter: OrderSubmitter, - val eventSubmitter: EventSubmitter, + val orderRequestEventSubmitter: OrderRequestEventSubmitter, val pairConfigLoader: PairConfigLoader, private val kafkaHealthIndicator: KafkaHealthIndicator, ) { @@ -55,7 +53,7 @@ class OrderService( if (!kafkaHealthIndicator.isHealthy) throw OpexException(OpexError.ServiceUnavailable) - val orderSubmitRequest = OrderSubmitRequest( + val orderSubmitRequest = OrderSubmitRequestEvent( createOrderRequest.uuid!!, //get from auth2 Pair(symbolSides[0], symbolSides[1]), createOrderRequest.price @@ -69,12 +67,12 @@ class OrderService( createOrderRequest.orderType, createOrderRequest.userLevel ) - return orderSubmitter.submit(orderSubmitRequest) + return orderRequestEventSubmitter.submit(orderSubmitRequest) } suspend fun cancelOrder(request: CancelOrderRequest): OrderSubmitResult { val symbols = request.symbol.split("_") - val event = CancelOrderEvent(request.ouid, request.uuid, request.orderId, Pair(symbols[0], symbols[1])) - return eventSubmitter.submit(event) + val event = OrderCancelRequestEvent(request.ouid, request.uuid, Pair(symbols[0], symbols[1]), request.orderId) + return orderRequestEventSubmitter.submit(event) } } diff --git a/matching-gateway/matching-gateway-app/src/test/kotlin/co/nilin/opex/matching/gateway/app/service/OrderServiceTest.kt b/matching-gateway/matching-gateway-app/src/test/kotlin/co/nilin/opex/matching/gateway/app/service/OrderServiceTest.kt index 7c09ca47b..306586dff 100644 --- a/matching-gateway/matching-gateway-app/src/test/kotlin/co/nilin/opex/matching/gateway/app/service/OrderServiceTest.kt +++ b/matching-gateway/matching-gateway-app/src/test/kotlin/co/nilin/opex/matching/gateway/app/service/OrderServiceTest.kt @@ -7,7 +7,7 @@ import co.nilin.opex.matching.gateway.app.spi.PairConfigLoader import co.nilin.opex.matching.gateway.ports.kafka.submitter.inout.OrderSubmitResult import co.nilin.opex.matching.gateway.ports.kafka.submitter.service.EventSubmitter import co.nilin.opex.matching.gateway.ports.kafka.submitter.service.KafkaHealthIndicator -import co.nilin.opex.matching.gateway.ports.kafka.submitter.service.OrderSubmitter +import co.nilin.opex.matching.gateway.ports.kafka.submitter.service.OrderRequestEventSubmitter import io.mockk.* import kotlinx.coroutines.runBlocking import org.assertj.core.api.Assertions.assertThat @@ -17,14 +17,13 @@ import java.math.BigDecimal private class OrderServiceTest { private val accountantApiProxy: AccountantApiProxy = mockk() - private val orderSubmitter: OrderSubmitter = mockk() - private val eventSubmitter: EventSubmitter = mockk() + private val orderRequestEventSubmitter: OrderRequestEventSubmitter = mockk() + private val eventSubmitter: OrderRequestEventSubmitter = mockk() private val pairConfigLoader: PairConfigLoader = mockk() private val kafkaHealthIndicator: KafkaHealthIndicator = mockk() private val orderService: OrderService = OrderService( accountantApiProxy, - orderSubmitter, - eventSubmitter, + orderRequestEventSubmitter, pairConfigLoader, kafkaHealthIndicator ) @@ -44,7 +43,7 @@ private class OrderServiceTest { ) } returns true coEvery { - orderSubmitter.submit(any()) + orderRequestEventSubmitter.submit(any()) } returns OrderSubmitResult(null) coEvery { kafkaHealthIndicator.isHealthy @@ -66,7 +65,7 @@ private class OrderServiceTest { ) } returns true coEvery { - orderSubmitter.submit(any()) + orderRequestEventSubmitter.submit(any()) } returns OrderSubmitResult(null) coEvery { kafkaHealthIndicator.isHealthy @@ -175,15 +174,4 @@ private class OrderServiceTest { } }.isNotInstanceOf(MockKException::class.java) } - - @Test - fun givenEventSubmitter_whenCancelOrder_thenOrderSubmitResult(): Unit = runBlocking { - coEvery { - eventSubmitter.submit(any()) - } returns OrderSubmitResult(null) - - val orderSubmitResult = orderService.cancelOrder(VALID.CANCEL_ORDER_REQUEST) - - assertThat(orderSubmitResult).isNotNull - } } diff --git a/matching-gateway/matching-gateway-port/matching-gateway-submitter-kafka/src/main/kotlin/co/nilin/opex/matching/gateway/ports/kafka/submitter/config/OrderKafkaConfig.kt b/matching-gateway/matching-gateway-port/matching-gateway-submitter-kafka/src/main/kotlin/co/nilin/opex/matching/gateway/ports/kafka/submitter/config/OrderKafkaConfig.kt index a8dab0441..0e0bf5189 100644 --- a/matching-gateway/matching-gateway-port/matching-gateway-submitter-kafka/src/main/kotlin/co/nilin/opex/matching/gateway/ports/kafka/submitter/config/OrderKafkaConfig.kt +++ b/matching-gateway/matching-gateway-port/matching-gateway-submitter-kafka/src/main/kotlin/co/nilin/opex/matching/gateway/ports/kafka/submitter/config/OrderKafkaConfig.kt @@ -1,7 +1,8 @@ package co.nilin.opex.matching.gateway.ports.kafka.submitter.config import co.nilin.opex.matching.engine.core.eventh.events.CoreEvent -import co.nilin.opex.matching.gateway.ports.kafka.submitter.inout.OrderSubmitRequest +import co.nilin.opex.matching.gateway.ports.kafka.submitter.inout.OrderRequestEvent +import co.nilin.opex.matching.gateway.ports.kafka.submitter.inout.OrderSubmitRequestEvent import org.apache.kafka.clients.producer.ProducerConfig import org.apache.kafka.common.serialization.StringSerializer import org.springframework.beans.factory.annotation.Qualifier @@ -11,6 +12,7 @@ import org.springframework.context.annotation.Configuration import org.springframework.kafka.core.DefaultKafkaProducerFactory import org.springframework.kafka.core.KafkaTemplate import org.springframework.kafka.core.ProducerFactory +import org.springframework.kafka.support.serializer.JsonDeserializer import org.springframework.kafka.support.serializer.JsonSerializer @Configuration @@ -26,17 +28,17 @@ class OrderKafkaConfig { ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG to StringSerializer::class.java, ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG to JsonSerializer::class.java, ProducerConfig.ACKS_CONFIG to "all", - JsonSerializer.TYPE_MAPPINGS to "order_request:co.nilin.opex.matching.gateway.ports.kafka.submitter.inout.OrderSubmitRequest" + JsonDeserializer.TYPE_MAPPINGS to "order_request_event:co.nilin.opex.matching.gateway.ports.kafka.submitter.inout.OrderRequestEvent,order_request_submit:co.nilin.opex.matching.gateway.ports.kafka.submitter.inout.OrderSubmitRequestEvent,order_request_cancel:co.nilin.opex.matching.gateway.ports.kafka.submitter.inout.OrderCancelRequestEvent" ) } @Bean("orderProducerFactory") - fun producerFactory(@Qualifier("orderProducerConfigs") producerConfigs: Map): ProducerFactory { + fun producerFactory(@Qualifier("orderProducerConfigs") producerConfigs: Map): ProducerFactory { return DefaultKafkaProducerFactory(producerConfigs) } @Bean("orderKafkaTemplate") - fun kafkaTemplate(@Qualifier("orderProducerFactory") producerFactory: ProducerFactory): KafkaTemplate { + fun kafkaTemplate(@Qualifier("orderProducerFactory") producerFactory: ProducerFactory): KafkaTemplate { return KafkaTemplate(producerFactory) } diff --git a/matching-gateway/matching-gateway-port/matching-gateway-submitter-kafka/src/main/kotlin/co/nilin/opex/matching/gateway/ports/kafka/submitter/inout/OrderCancelRequestEvent.kt b/matching-gateway/matching-gateway-port/matching-gateway-submitter-kafka/src/main/kotlin/co/nilin/opex/matching/gateway/ports/kafka/submitter/inout/OrderCancelRequestEvent.kt new file mode 100644 index 000000000..c9b467a20 --- /dev/null +++ b/matching-gateway/matching-gateway-port/matching-gateway-submitter-kafka/src/main/kotlin/co/nilin/opex/matching/gateway/ports/kafka/submitter/inout/OrderCancelRequestEvent.kt @@ -0,0 +1,10 @@ +package co.nilin.opex.matching.gateway.ports.kafka.submitter.inout + +import co.nilin.opex.matching.engine.core.model.Pair + +class OrderCancelRequestEvent( + ouid: String, + uuid: String, + pair: Pair, + val orderId: Long +) : OrderRequestEvent(ouid, uuid, pair) \ No newline at end of file diff --git a/matching-gateway/matching-gateway-port/matching-gateway-submitter-kafka/src/main/kotlin/co/nilin/opex/matching/gateway/ports/kafka/submitter/inout/OrderRequestEvent.kt b/matching-gateway/matching-gateway-port/matching-gateway-submitter-kafka/src/main/kotlin/co/nilin/opex/matching/gateway/ports/kafka/submitter/inout/OrderRequestEvent.kt new file mode 100644 index 000000000..462f2f58a --- /dev/null +++ b/matching-gateway/matching-gateway-port/matching-gateway-submitter-kafka/src/main/kotlin/co/nilin/opex/matching/gateway/ports/kafka/submitter/inout/OrderRequestEvent.kt @@ -0,0 +1,5 @@ +package co.nilin.opex.matching.gateway.ports.kafka.submitter.inout + +import co.nilin.opex.matching.engine.core.model.Pair + +abstract class OrderRequestEvent(val ouid:String, val uuid: String, val pair: Pair) \ No newline at end of file diff --git a/matching-gateway/matching-gateway-port/matching-gateway-submitter-kafka/src/main/kotlin/co/nilin/opex/matching/gateway/ports/kafka/submitter/inout/OrderSubmitRequest.kt b/matching-gateway/matching-gateway-port/matching-gateway-submitter-kafka/src/main/kotlin/co/nilin/opex/matching/gateway/ports/kafka/submitter/inout/OrderSubmitRequestEvent.kt similarity index 80% rename from matching-gateway/matching-gateway-port/matching-gateway-submitter-kafka/src/main/kotlin/co/nilin/opex/matching/gateway/ports/kafka/submitter/inout/OrderSubmitRequest.kt rename to matching-gateway/matching-gateway-port/matching-gateway-submitter-kafka/src/main/kotlin/co/nilin/opex/matching/gateway/ports/kafka/submitter/inout/OrderSubmitRequestEvent.kt index d6c5eb05e..099c06a2a 100644 --- a/matching-gateway/matching-gateway-port/matching-gateway-submitter-kafka/src/main/kotlin/co/nilin/opex/matching/gateway/ports/kafka/submitter/inout/OrderSubmitRequest.kt +++ b/matching-gateway/matching-gateway-port/matching-gateway-submitter-kafka/src/main/kotlin/co/nilin/opex/matching/gateway/ports/kafka/submitter/inout/OrderSubmitRequestEvent.kt @@ -6,15 +6,14 @@ import co.nilin.opex.matching.engine.core.model.OrderType import co.nilin.opex.matching.engine.core.model.Pair import java.util.* -data class OrderSubmitRequest( - val uuid: String, - val pair: Pair, +class OrderSubmitRequestEvent( + uuid: String, + pair: Pair, val price: Long, val quantity: Long, val direction: OrderDirection, val matchConstraint: MatchConstraint, val orderType: OrderType, val userLevel: String, - val ouid: String = UUID.randomUUID().toString(), val orderId: Long? = null, -) \ No newline at end of file +) : OrderRequestEvent(UUID.randomUUID().toString(), uuid, pair) \ No newline at end of file diff --git a/matching-gateway/matching-gateway-port/matching-gateway-submitter-kafka/src/main/kotlin/co/nilin/opex/matching/gateway/ports/kafka/submitter/service/OrderSubmitter.kt b/matching-gateway/matching-gateway-port/matching-gateway-submitter-kafka/src/main/kotlin/co/nilin/opex/matching/gateway/ports/kafka/submitter/service/OrderRequestEventSubmitter.kt similarity index 69% rename from matching-gateway/matching-gateway-port/matching-gateway-submitter-kafka/src/main/kotlin/co/nilin/opex/matching/gateway/ports/kafka/submitter/service/OrderSubmitter.kt rename to matching-gateway/matching-gateway-port/matching-gateway-submitter-kafka/src/main/kotlin/co/nilin/opex/matching/gateway/ports/kafka/submitter/service/OrderRequestEventSubmitter.kt index 57ed1bd61..d66ee7ac6 100644 --- a/matching-gateway/matching-gateway-port/matching-gateway-submitter-kafka/src/main/kotlin/co/nilin/opex/matching/gateway/ports/kafka/submitter/service/OrderSubmitter.kt +++ b/matching-gateway/matching-gateway-port/matching-gateway-submitter-kafka/src/main/kotlin/co/nilin/opex/matching/gateway/ports/kafka/submitter/service/OrderRequestEventSubmitter.kt @@ -1,6 +1,6 @@ package co.nilin.opex.matching.gateway.ports.kafka.submitter.service -import co.nilin.opex.matching.gateway.ports.kafka.submitter.inout.OrderSubmitRequest +import co.nilin.opex.matching.gateway.ports.kafka.submitter.inout.OrderRequestEvent import co.nilin.opex.matching.gateway.ports.kafka.submitter.inout.OrderSubmitResult import org.slf4j.LoggerFactory import org.springframework.kafka.core.KafkaTemplate @@ -10,12 +10,12 @@ import kotlin.coroutines.resumeWithException import kotlin.coroutines.suspendCoroutine @Component -class OrderSubmitter(val kafkaTemplate: KafkaTemplate) { +class OrderRequestEventSubmitter(val kafkaTemplate: KafkaTemplate) { - private val logger = LoggerFactory.getLogger(OrderSubmitter::class.java) + private val logger = LoggerFactory.getLogger(OrderRequestEventSubmitter::class.java) - suspend fun submit(order: OrderSubmitRequest): OrderSubmitResult = suspendCoroutine { cont -> - logger.info("Submitting OrderSubmitRequest: ouid=${order.ouid}") + suspend fun submit(order: OrderRequestEvent): OrderSubmitResult = suspendCoroutine { cont -> + logger.info("Submitting OrderRequestEvent: ouid=${order.ouid}") val sendFuture = kafkaTemplate.send("orders_${order.pair.leftSideName}_${order.pair.rightSideName}", order) sendFuture.addCallback({ diff --git a/preferences-dev.yml b/preferences-dev.yml index d8d616c99..87e30f3ef 100644 --- a/preferences-dev.yml +++ b/preferences-dev.yml @@ -10,38 +10,84 @@ chains: - url: http://bitcoin-scanner:8080 maxBlockRange: 30 delayOnRateLimit: 5 - schedule: - delay: 600 - errorDelay: 60 - timeout: 30 - maxRetries: 5 - confirmations: 0 - enabled: false + maxParallelCall: 2 + schedules: + - workerType: MAIN + delay: 600 + timeout: 30 + maxRetries: 5 + confirmations: 0 + maxBlockCount: 4 + enabled: false + - workerType: ERROR + delay: 600 + timeout: 30 + maxRetries: 5 + confirmations: 0 + maxBlockCount: 4 + enabled: false + - workerType: DELAYED + delay: 300 + timeout: 30 + maxRetries: 5 + confirmations: 0 + maxBlockCount: 2 + enabled: false - name: test-ethereum addressType: ethereum scanners: - url: http://ethereum-scanner:8080 maxBlockRange: 30 delayOnRateLimit: 5 - schedule: - delay: 15 - errorDelay: 7 - timeout: 30 - maxRetries: 5 - confirmations: 0 - enabled: false + maxParallelCall: 3 + schedules: + - workerType: MAIN + delay: 15 + timeout: 30 + maxRetries: 5 + confirmations: 0 + maxBlockCount: 10 + enabled: false + - workerType: ERROR + delay: 7 + timeout: 30 + maxRetries: 5 + confirmations: 0 + maxBlockCount: 10 + enabled: false + - workerType: DELAYED + delay: 15 + timeout: 30 + maxRetries: 5 + confirmations: 0 + maxBlockCount: 5 + enabled: false - name: test-bsc addressType: ethereum scanners: - url: http://bsc-scanner:8080 maxBlockRange: 10 delayOnRateLimit: 300 - schedule: - delay: 6 - errorDelay: 3 - timeout: 30 - maxRetries: 50 - confirmations: 0 + maxParallelCall: 5 + schedules: + - workerType: MAIN + delay: 6 + timeout: 30 + maxRetries: 5 + confirmations: 0 + maxBlockCount: 30 + - workerType: ERROR + delay: 3 + timeout: 30 + maxRetries: 5 + confirmations: 0 + maxBlockCount: 20 + - workerType: DELAYED + delay: 10 + timeout: 30 + maxRetries: 5 + confirmations: 0 + maxBlockCount: 10 currencies: - symbol: IRT name: Toman From 337077712bd4690976ccb3550b302a01b3115ae1 Mon Sep 17 00:00:00 2001 From: Peyman Date: Wed, 21 Jun 2023 13:09:53 +0330 Subject: [PATCH 7/8] Fix eventlog kafka config --- .../eventlog/app/listeners/OrderListener.kt | 32 ++++++++------- .../listener/config/EventLogKafkaConfig.kt | 10 ++++- .../listener/consumer/EventKafkaListener.kt | 4 +- .../listener/consumer/OrderKafkaListener.kt | 10 +++-- .../listener/inout/OrderCancelRequestEvent.kt | 10 +++++ .../kafka/listener/inout/OrderRequestEvent.kt | 5 +++ .../listener/inout/OrderSubmitRequest.kt | 41 ------------------- .../listener/inout/OrderSubmitRequestEvent.kt | 19 +++++++++ .../spi/OrderSubmitRequestListener.kt | 4 +- 9 files changed, 70 insertions(+), 65 deletions(-) create mode 100644 eventlog/eventlog-ports/eventlog-eventlistener-kafka/src/main/kotlin/co/nilin/opex/eventlog/ports/kafka/listener/inout/OrderCancelRequestEvent.kt create mode 100644 eventlog/eventlog-ports/eventlog-eventlistener-kafka/src/main/kotlin/co/nilin/opex/eventlog/ports/kafka/listener/inout/OrderRequestEvent.kt delete mode 100644 eventlog/eventlog-ports/eventlog-eventlistener-kafka/src/main/kotlin/co/nilin/opex/eventlog/ports/kafka/listener/inout/OrderSubmitRequest.kt create mode 100644 eventlog/eventlog-ports/eventlog-eventlistener-kafka/src/main/kotlin/co/nilin/opex/eventlog/ports/kafka/listener/inout/OrderSubmitRequestEvent.kt diff --git a/eventlog/eventlog-app/src/main/kotlin/co/nilin/opex/eventlog/app/listeners/OrderListener.kt b/eventlog/eventlog-app/src/main/kotlin/co/nilin/opex/eventlog/app/listeners/OrderListener.kt index c4cc54a06..af74f16a5 100644 --- a/eventlog/eventlog-app/src/main/kotlin/co/nilin/opex/eventlog/app/listeners/OrderListener.kt +++ b/eventlog/eventlog-app/src/main/kotlin/co/nilin/opex/eventlog/app/listeners/OrderListener.kt @@ -1,7 +1,8 @@ package co.nilin.opex.eventlog.app.listeners import co.nilin.opex.eventlog.core.spi.OrderPersister -import co.nilin.opex.eventlog.ports.kafka.listener.inout.OrderSubmitRequest +import co.nilin.opex.eventlog.ports.kafka.listener.inout.OrderRequestEvent +import co.nilin.opex.eventlog.ports.kafka.listener.inout.OrderSubmitRequestEvent import co.nilin.opex.eventlog.ports.kafka.listener.spi.OrderSubmitRequestListener import co.nilin.opex.matching.engine.core.eventh.events.SubmitOrderEvent @@ -11,20 +12,21 @@ class OrderListener(private val orderPersister: OrderPersister) : OrderSubmitReq return "OrderListener" } - override suspend fun onOrder(order: OrderSubmitRequest, partition: Int, offset: Long, timestamp: Long) { - orderPersister.submitOrder( - SubmitOrderEvent( - order.ouid, - order.uuid, - order.orderId, - order.pair, - order.price, - order.quantity, - 0, - order.direction, - order.matchConstraint, - order.orderType + override suspend fun onOrder(order: OrderRequestEvent, partition: Int, offset: Long, timestamp: Long) { + if (order is OrderSubmitRequestEvent) + orderPersister.submitOrder( + SubmitOrderEvent( + order.ouid, + order.uuid, + order.orderId, + order.pair, + order.price, + order.quantity, + 0, + order.direction, + order.matchConstraint, + order.orderType + ) ) - ) } } \ No newline at end of file diff --git a/eventlog/eventlog-ports/eventlog-eventlistener-kafka/src/main/kotlin/co/nilin/opex/eventlog/ports/kafka/listener/config/EventLogKafkaConfig.kt b/eventlog/eventlog-ports/eventlog-eventlistener-kafka/src/main/kotlin/co/nilin/opex/eventlog/ports/kafka/listener/config/EventLogKafkaConfig.kt index d984b6792..c21ddba45 100644 --- a/eventlog/eventlog-ports/eventlog-eventlistener-kafka/src/main/kotlin/co/nilin/opex/eventlog/ports/kafka/listener/config/EventLogKafkaConfig.kt +++ b/eventlog/eventlog-ports/eventlog-eventlistener-kafka/src/main/kotlin/co/nilin/opex/eventlog/ports/kafka/listener/config/EventLogKafkaConfig.kt @@ -4,6 +4,7 @@ import co.nilin.opex.eventlog.ports.kafka.listener.consumer.DLTKafkaListener import co.nilin.opex.eventlog.ports.kafka.listener.consumer.EventKafkaListener import co.nilin.opex.eventlog.ports.kafka.listener.consumer.OrderKafkaListener import co.nilin.opex.eventlog.ports.kafka.listener.consumer.TradeKafkaListener +import co.nilin.opex.eventlog.ports.kafka.listener.inout.OrderRequestEvent import co.nilin.opex.matching.engine.core.eventh.events.CoreEvent import org.apache.kafka.clients.consumer.ConsumerConfig import org.apache.kafka.common.serialization.StringDeserializer @@ -37,7 +38,7 @@ class EventLogKafkaConfig { ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG to StringDeserializer::class.java, ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG to JsonDeserializer::class.java, JsonDeserializer.TRUSTED_PACKAGES to "co.nilin.opex.*", - JsonDeserializer.TYPE_MAPPINGS to "order_request:co.nilin.opex.eventlog.ports.kafka.listener.inout.OrderSubmitRequest" + JsonDeserializer.TYPE_MAPPINGS to "order_request_event:co.nilin.opex.eventlog.ports.kafka.listener.inout.OrderRequestEvent,order_request_submit:co.nilin.opex.eventlog.ports.kafka.listener.inout.OrderSubmitRequestEvent,order_request_cancel:co.nilin.opex.eventlog.ports.kafka.listener.inout.OrderCancelRequestEvent" ) } @@ -57,6 +58,11 @@ class EventLogKafkaConfig { return DefaultKafkaConsumerFactory(consumerConfigs) } + @Bean("orderRequestConsumerFactory") + fun orderRequestConsumerFactory(@Qualifier("eventLogConsumerConfig") consumerConfigs: Map): ConsumerFactory { + return DefaultKafkaConsumerFactory(consumerConfigs) + } + @Bean fun dltConsumerFactory(@Qualifier("dltConsumerConfig") configs: Map): ConsumerFactory { return DefaultKafkaConsumerFactory(configs) @@ -92,7 +98,7 @@ class EventLogKafkaConfig { @ConditionalOnBean(OrderKafkaListener::class) fun configureOrderListener( orderListener: OrderKafkaListener, - @Qualifier("eventLogConsumerFactory") consumerFactory: ConsumerFactory + @Qualifier("orderRequestConsumerFactory") consumerFactory: ConsumerFactory ) { val containerProps = ContainerProperties(Pattern.compile("orders_.*")) containerProps.messageListener = orderListener diff --git a/eventlog/eventlog-ports/eventlog-eventlistener-kafka/src/main/kotlin/co/nilin/opex/eventlog/ports/kafka/listener/consumer/EventKafkaListener.kt b/eventlog/eventlog-ports/eventlog-eventlistener-kafka/src/main/kotlin/co/nilin/opex/eventlog/ports/kafka/listener/consumer/EventKafkaListener.kt index ef6409bc4..d0471715f 100644 --- a/eventlog/eventlog-ports/eventlog-eventlistener-kafka/src/main/kotlin/co/nilin/opex/eventlog/ports/kafka/listener/consumer/EventKafkaListener.kt +++ b/eventlog/eventlog-ports/eventlog-eventlistener-kafka/src/main/kotlin/co/nilin/opex/eventlog/ports/kafka/listener/consumer/EventKafkaListener.kt @@ -9,7 +9,9 @@ import org.springframework.stereotype.Component @Component class EventKafkaListener : MessageListener { - val eventListeners = arrayListOf() + + private val eventListeners = arrayListOf() + override fun onMessage(data: ConsumerRecord) { eventListeners.forEach { tl -> tl.onEvent(data.value(), data.partition(), data.offset(), data.timestamp()) diff --git a/eventlog/eventlog-ports/eventlog-eventlistener-kafka/src/main/kotlin/co/nilin/opex/eventlog/ports/kafka/listener/consumer/OrderKafkaListener.kt b/eventlog/eventlog-ports/eventlog-eventlistener-kafka/src/main/kotlin/co/nilin/opex/eventlog/ports/kafka/listener/consumer/OrderKafkaListener.kt index 58ffa6310..34ffd9534 100644 --- a/eventlog/eventlog-ports/eventlog-eventlistener-kafka/src/main/kotlin/co/nilin/opex/eventlog/ports/kafka/listener/consumer/OrderKafkaListener.kt +++ b/eventlog/eventlog-ports/eventlog-eventlistener-kafka/src/main/kotlin/co/nilin/opex/eventlog/ports/kafka/listener/consumer/OrderKafkaListener.kt @@ -1,6 +1,6 @@ package co.nilin.opex.eventlog.ports.kafka.listener.consumer -import co.nilin.opex.eventlog.ports.kafka.listener.inout.OrderSubmitRequest +import co.nilin.opex.eventlog.ports.kafka.listener.inout.OrderRequestEvent import co.nilin.opex.eventlog.ports.kafka.listener.spi.OrderSubmitRequestListener import kotlinx.coroutines.ExecutorCoroutineDispatcher import kotlinx.coroutines.runBlocking @@ -10,9 +10,11 @@ import org.springframework.kafka.listener.MessageListener class OrderKafkaListener(private val executorCoroutineDispatcher: ExecutorCoroutineDispatcher) : - MessageListener { - val orderListeners = arrayListOf() - override fun onMessage(data: ConsumerRecord) { + MessageListener { + + private val orderListeners = arrayListOf() + + override fun onMessage(data: ConsumerRecord) { runBlocking { orderListeners.forEach { tl -> withContext(executorCoroutineDispatcher) { diff --git a/eventlog/eventlog-ports/eventlog-eventlistener-kafka/src/main/kotlin/co/nilin/opex/eventlog/ports/kafka/listener/inout/OrderCancelRequestEvent.kt b/eventlog/eventlog-ports/eventlog-eventlistener-kafka/src/main/kotlin/co/nilin/opex/eventlog/ports/kafka/listener/inout/OrderCancelRequestEvent.kt new file mode 100644 index 000000000..169e0de01 --- /dev/null +++ b/eventlog/eventlog-ports/eventlog-eventlistener-kafka/src/main/kotlin/co/nilin/opex/eventlog/ports/kafka/listener/inout/OrderCancelRequestEvent.kt @@ -0,0 +1,10 @@ +package co.nilin.opex.eventlog.ports.kafka.listener.inout + +import co.nilin.opex.matching.engine.core.model.Pair + +class OrderCancelRequestEvent( + ouid: String, + uuid: String, + pair: Pair, + val orderId: Long +) : OrderRequestEvent(ouid, uuid, pair) \ No newline at end of file diff --git a/eventlog/eventlog-ports/eventlog-eventlistener-kafka/src/main/kotlin/co/nilin/opex/eventlog/ports/kafka/listener/inout/OrderRequestEvent.kt b/eventlog/eventlog-ports/eventlog-eventlistener-kafka/src/main/kotlin/co/nilin/opex/eventlog/ports/kafka/listener/inout/OrderRequestEvent.kt new file mode 100644 index 000000000..d0d6381cf --- /dev/null +++ b/eventlog/eventlog-ports/eventlog-eventlistener-kafka/src/main/kotlin/co/nilin/opex/eventlog/ports/kafka/listener/inout/OrderRequestEvent.kt @@ -0,0 +1,5 @@ +package co.nilin.opex.eventlog.ports.kafka.listener.inout + +import co.nilin.opex.matching.engine.core.model.Pair + +abstract class OrderRequestEvent(val ouid:String, val uuid: String, val pair: Pair) \ No newline at end of file diff --git a/eventlog/eventlog-ports/eventlog-eventlistener-kafka/src/main/kotlin/co/nilin/opex/eventlog/ports/kafka/listener/inout/OrderSubmitRequest.kt b/eventlog/eventlog-ports/eventlog-eventlistener-kafka/src/main/kotlin/co/nilin/opex/eventlog/ports/kafka/listener/inout/OrderSubmitRequest.kt deleted file mode 100644 index cac421629..000000000 --- a/eventlog/eventlog-ports/eventlog-eventlistener-kafka/src/main/kotlin/co/nilin/opex/eventlog/ports/kafka/listener/inout/OrderSubmitRequest.kt +++ /dev/null @@ -1,41 +0,0 @@ -package co.nilin.opex.eventlog.ports.kafka.listener.inout - -import co.nilin.opex.matching.engine.core.model.MatchConstraint -import co.nilin.opex.matching.engine.core.model.OrderDirection -import co.nilin.opex.matching.engine.core.model.OrderType - -public class OrderSubmitRequest() { - lateinit var ouid: String - lateinit var uuid: String - var orderId: Long? = null - lateinit var pair: co.nilin.opex.matching.engine.core.model.Pair - var price: Long = 0 - var quantity: Long = 0 - var direction: OrderDirection = OrderDirection.BID - var matchConstraint: MatchConstraint = MatchConstraint.GTC - var orderType: OrderType = OrderType.LIMIT_ORDER - - constructor( - ouid: String, - uuid: String, - orderId: Long?, - pair: co.nilin.opex.matching.engine.core.model.Pair, - price: Long, - quantity: Long, - direction: OrderDirection, - matchConstraint: MatchConstraint, - orderType: OrderType - ) : this() { - this.ouid = ouid - this.uuid = uuid - this.orderId = orderId - this.pair = pair - this.price = price - this.quantity = quantity - this.direction = direction - this.matchConstraint = matchConstraint - this.orderType = orderType - } - - -} \ No newline at end of file diff --git a/eventlog/eventlog-ports/eventlog-eventlistener-kafka/src/main/kotlin/co/nilin/opex/eventlog/ports/kafka/listener/inout/OrderSubmitRequestEvent.kt b/eventlog/eventlog-ports/eventlog-eventlistener-kafka/src/main/kotlin/co/nilin/opex/eventlog/ports/kafka/listener/inout/OrderSubmitRequestEvent.kt new file mode 100644 index 000000000..b156121eb --- /dev/null +++ b/eventlog/eventlog-ports/eventlog-eventlistener-kafka/src/main/kotlin/co/nilin/opex/eventlog/ports/kafka/listener/inout/OrderSubmitRequestEvent.kt @@ -0,0 +1,19 @@ +package co.nilin.opex.eventlog.ports.kafka.listener.inout + +import co.nilin.opex.matching.engine.core.model.MatchConstraint +import co.nilin.opex.matching.engine.core.model.OrderDirection +import co.nilin.opex.matching.engine.core.model.OrderType +import co.nilin.opex.matching.engine.core.model.Pair + +class OrderSubmitRequestEvent( + ouid: String, + uuid: String, + pair: Pair, + val price: Long, + val quantity: Long, + val direction: OrderDirection, + val matchConstraint: MatchConstraint, + val orderType: OrderType, + val userLevel: String, + val orderId: Long? = null, +) : OrderRequestEvent(ouid, uuid, pair) \ No newline at end of file diff --git a/eventlog/eventlog-ports/eventlog-eventlistener-kafka/src/main/kotlin/co/nilin/opex/eventlog/ports/kafka/listener/spi/OrderSubmitRequestListener.kt b/eventlog/eventlog-ports/eventlog-eventlistener-kafka/src/main/kotlin/co/nilin/opex/eventlog/ports/kafka/listener/spi/OrderSubmitRequestListener.kt index 4cae57cb1..1c618ba95 100644 --- a/eventlog/eventlog-ports/eventlog-eventlistener-kafka/src/main/kotlin/co/nilin/opex/eventlog/ports/kafka/listener/spi/OrderSubmitRequestListener.kt +++ b/eventlog/eventlog-ports/eventlog-eventlistener-kafka/src/main/kotlin/co/nilin/opex/eventlog/ports/kafka/listener/spi/OrderSubmitRequestListener.kt @@ -1,8 +1,8 @@ package co.nilin.opex.eventlog.ports.kafka.listener.spi -import co.nilin.opex.eventlog.ports.kafka.listener.inout.OrderSubmitRequest +import co.nilin.opex.eventlog.ports.kafka.listener.inout.OrderRequestEvent interface OrderSubmitRequestListener { fun id(): String - suspend fun onOrder(order: OrderSubmitRequest, partition: Int, offset: Long, timestamp: Long) + suspend fun onOrder(order: OrderRequestEvent, partition: Int, offset: Long, timestamp: Long) } \ No newline at end of file From 61f0c5adc570921e749b07ba24452dd9ef204931 Mon Sep 17 00:00:00 2001 From: Peyman Date: Sat, 24 Jun 2023 16:58:56 +0330 Subject: [PATCH 8/8] Bump up version --- accountant/accountant-app/pom.xml | 2 +- accountant/accountant-core/pom.xml | 2 +- .../accountant-ports/accountant-eventlistener-kafka/pom.xml | 2 +- .../accountant-ports/accountant-persister-postgres/pom.xml | 2 +- accountant/accountant-ports/accountant-submitter-kafka/pom.xml | 2 +- accountant/accountant-ports/accountant-wallet-proxy/pom.xml | 2 +- accountant/pom.xml | 2 +- admin/admin-app/pom.xml | 2 +- admin/admin-core/pom.xml | 2 +- admin/admin-ports/admin-service-auth/pom.xml | 2 +- admin/admin-ports/admin-submitter-kafka/pom.xml | 2 +- admin/pom.xml | 2 +- api/api-app/pom.xml | 2 +- api/api-core/pom.xml | 2 +- api/api-ports/api-binance-rest/pom.xml | 2 +- api/api-ports/api-persister-postgres/pom.xml | 2 +- api/api-ports/api-proxy-rest/pom.xml | 2 +- api/pom.xml | 2 +- bc-gateway/bc-gateway-app/pom.xml | 2 +- bc-gateway/bc-gateway-core/pom.xml | 2 +- .../bc-gateway-ports/bc-gateway-eventlistener-kafka/pom.xml | 2 +- .../bc-gateway-ports/bc-gateway-persister-postgres/pom.xml | 2 +- bc-gateway/bc-gateway-ports/bc-gateway-wallet-proxy/pom.xml | 2 +- bc-gateway/pom.xml | 2 +- captcha/captcha-app/pom.xml | 2 +- captcha/pom.xml | 2 +- eventlog/eventlog-app/pom.xml | 2 +- eventlog/eventlog-core/pom.xml | 2 +- eventlog/eventlog-ports/eventlog-eventlistener-kafka/pom.xml | 2 +- eventlog/eventlog-ports/eventlog-persister-postgres/pom.xml | 2 +- eventlog/pom.xml | 2 +- market/market-app/pom.xml | 2 +- market/market-core/pom.xml | 2 +- market/market-ports/market-eventlistener-kafka/pom.xml | 2 +- market/market-ports/market-persister-postgres/pom.xml | 2 +- market/pom.xml | 2 +- matching-engine/matching-engine-app/pom.xml | 2 +- matching-engine/matching-engine-core/pom.xml | 2 +- .../matching-engine-eventlistener-kafka/pom.xml | 2 +- .../matching-engine-snapshots-redis/pom.xml | 2 +- .../matching-engine-submitter-kafka/pom.xml | 2 +- matching-engine/pom.xml | 2 +- matching-gateway/matching-gateway-app/pom.xml | 2 +- .../matching-gateway-submitter-kafka/pom.xml | 2 +- matching-gateway/pom.xml | 2 +- pom.xml | 2 +- referral/pom.xml | 2 +- referral/referral-app/pom.xml | 2 +- referral/referral-core/pom.xml | 2 +- referral/referral-ports/referral-api-proxy/pom.xml | 2 +- referral/referral-ports/referral-eventlistener-kafka/pom.xml | 2 +- referral/referral-ports/referral-persister-postgres/pom.xml | 2 +- referral/referral-ports/referral-wallet-proxy/pom.xml | 2 +- storage/pom.xml | 2 +- storage/storage-app/pom.xml | 2 +- user-management/keycloak-gateway/pom.xml | 2 +- user-management/pom.xml | 2 +- utility/error-handler/pom.xml | 2 +- utility/interceptors/pom.xml | 2 +- utility/logging-handler/pom.xml | 2 +- utility/pom.xml | 2 +- utility/preferences/pom.xml | 2 +- wallet/pom.xml | 2 +- wallet/wallet-app/pom.xml | 2 +- wallet/wallet-core/pom.xml | 2 +- wallet/wallet-ports/wallet-eventlistener-kafka/pom.xml | 2 +- wallet/wallet-ports/wallet-persister-postgres/pom.xml | 2 +- websocket/pom.xml | 2 +- websocket/websocket-app/pom.xml | 2 +- websocket/websocket-core/pom.xml | 2 +- websocket/websocket-ports/websocket-eventlistener-kafka/pom.xml | 2 +- websocket/websocket-ports/websocket-persister-postgres/pom.xml | 2 +- 72 files changed, 72 insertions(+), 72 deletions(-) diff --git a/accountant/accountant-app/pom.xml b/accountant/accountant-app/pom.xml index fdce361da..fe50e30f2 100644 --- a/accountant/accountant-app/pom.xml +++ b/accountant/accountant-app/pom.xml @@ -6,7 +6,7 @@ co.nilin.opex.accountant accountant - 1.0.0-beta.3 + 1.0.1-beta.7 co.nilin.opex.accountant.app diff --git a/accountant/accountant-core/pom.xml b/accountant/accountant-core/pom.xml index d6c6b8c46..7d5b52bd7 100644 --- a/accountant/accountant-core/pom.xml +++ b/accountant/accountant-core/pom.xml @@ -6,7 +6,7 @@ co.nilin.opex.accountant accountant - 1.0.0-beta.3 + 1.0.1-beta.7 co.nilin.opex.accountant.core diff --git a/accountant/accountant-ports/accountant-eventlistener-kafka/pom.xml b/accountant/accountant-ports/accountant-eventlistener-kafka/pom.xml index 098453349..a27cadb8f 100644 --- a/accountant/accountant-ports/accountant-eventlistener-kafka/pom.xml +++ b/accountant/accountant-ports/accountant-eventlistener-kafka/pom.xml @@ -6,7 +6,7 @@ co.nilin.opex.accountant accountant - 1.0.0-beta.3 + 1.0.1-beta.7 ../../pom.xml diff --git a/accountant/accountant-ports/accountant-persister-postgres/pom.xml b/accountant/accountant-ports/accountant-persister-postgres/pom.xml index 8d0ea453d..8e7abf9c3 100644 --- a/accountant/accountant-ports/accountant-persister-postgres/pom.xml +++ b/accountant/accountant-ports/accountant-persister-postgres/pom.xml @@ -6,7 +6,7 @@ co.nilin.opex.accountant accountant - 1.0.0-beta.3 + 1.0.1-beta.7 ../../pom.xml diff --git a/accountant/accountant-ports/accountant-submitter-kafka/pom.xml b/accountant/accountant-ports/accountant-submitter-kafka/pom.xml index fdfb15aab..2759d98c3 100644 --- a/accountant/accountant-ports/accountant-submitter-kafka/pom.xml +++ b/accountant/accountant-ports/accountant-submitter-kafka/pom.xml @@ -6,7 +6,7 @@ co.nilin.opex.accountant accountant - 1.0.0-beta.3 + 1.0.1-beta.7 ../../pom.xml diff --git a/accountant/accountant-ports/accountant-wallet-proxy/pom.xml b/accountant/accountant-ports/accountant-wallet-proxy/pom.xml index bb691dafd..35009ff17 100644 --- a/accountant/accountant-ports/accountant-wallet-proxy/pom.xml +++ b/accountant/accountant-ports/accountant-wallet-proxy/pom.xml @@ -6,7 +6,7 @@ co.nilin.opex.accountant accountant - 1.0.0-beta.3 + 1.0.1-beta.7 ../../pom.xml diff --git a/accountant/pom.xml b/accountant/pom.xml index f59ad3e2d..81c746ee9 100644 --- a/accountant/pom.xml +++ b/accountant/pom.xml @@ -6,7 +6,7 @@ core co.nilin.opex - 1.0.0-beta.3 + 1.0.1-beta.7 co.nilin.opex.accountant diff --git a/admin/admin-app/pom.xml b/admin/admin-app/pom.xml index 7115740f1..e1d8f28c3 100644 --- a/admin/admin-app/pom.xml +++ b/admin/admin-app/pom.xml @@ -8,7 +8,7 @@ co.nilin.opex.admin admin - 1.0.0-beta.3 + 1.0.1-beta.7 co.nilin.opex.admin.gateway diff --git a/admin/admin-core/pom.xml b/admin/admin-core/pom.xml index 8556da1a3..6590a42fa 100644 --- a/admin/admin-core/pom.xml +++ b/admin/admin-core/pom.xml @@ -6,7 +6,7 @@ co.nilin.opex.admin admin - 1.0.0-beta.3 + 1.0.1-beta.7 admin-core diff --git a/admin/admin-ports/admin-service-auth/pom.xml b/admin/admin-ports/admin-service-auth/pom.xml index 649aadf90..a248e892d 100644 --- a/admin/admin-ports/admin-service-auth/pom.xml +++ b/admin/admin-ports/admin-service-auth/pom.xml @@ -6,7 +6,7 @@ co.nilin.opex.admin admin - 1.0.0-beta.3 + 1.0.1-beta.7 ../../pom.xml diff --git a/admin/admin-ports/admin-submitter-kafka/pom.xml b/admin/admin-ports/admin-submitter-kafka/pom.xml index c70fd7369..bcae56291 100644 --- a/admin/admin-ports/admin-submitter-kafka/pom.xml +++ b/admin/admin-ports/admin-submitter-kafka/pom.xml @@ -6,7 +6,7 @@ co.nilin.opex.admin admin - 1.0.0-beta.3 + 1.0.1-beta.7 ../../pom.xml diff --git a/admin/pom.xml b/admin/pom.xml index 892b4ddec..44d744b78 100644 --- a/admin/pom.xml +++ b/admin/pom.xml @@ -6,7 +6,7 @@ core co.nilin.opex - 1.0.0-beta.3 + 1.0.1-beta.7 co.nilin.opex.admin diff --git a/api/api-app/pom.xml b/api/api-app/pom.xml index c58df1517..aa1f99523 100644 --- a/api/api-app/pom.xml +++ b/api/api-app/pom.xml @@ -6,7 +6,7 @@ co.nilin.opex.api api - 1.0.0-beta.3 + 1.0.1-beta.7 co.nilin.opex.api.app diff --git a/api/api-core/pom.xml b/api/api-core/pom.xml index 9cc1cb947..3bd667b98 100644 --- a/api/api-core/pom.xml +++ b/api/api-core/pom.xml @@ -6,7 +6,7 @@ co.nilin.opex.api api - 1.0.0-beta.3 + 1.0.1-beta.7 co.nilin.opex.api.core diff --git a/api/api-ports/api-binance-rest/pom.xml b/api/api-ports/api-binance-rest/pom.xml index 742dcd9ee..4b0f696e8 100644 --- a/api/api-ports/api-binance-rest/pom.xml +++ b/api/api-ports/api-binance-rest/pom.xml @@ -6,7 +6,7 @@ co.nilin.opex.api api - 1.0.0-beta.3 + 1.0.1-beta.7 ../../pom.xml diff --git a/api/api-ports/api-persister-postgres/pom.xml b/api/api-ports/api-persister-postgres/pom.xml index 83c6cd821..57b4c10bb 100644 --- a/api/api-ports/api-persister-postgres/pom.xml +++ b/api/api-ports/api-persister-postgres/pom.xml @@ -6,7 +6,7 @@ co.nilin.opex.api api - 1.0.0-beta.3 + 1.0.1-beta.7 ../../pom.xml diff --git a/api/api-ports/api-proxy-rest/pom.xml b/api/api-ports/api-proxy-rest/pom.xml index bc3bcc88d..b18685b7d 100644 --- a/api/api-ports/api-proxy-rest/pom.xml +++ b/api/api-ports/api-proxy-rest/pom.xml @@ -6,7 +6,7 @@ co.nilin.opex.api api - 1.0.0-beta.3 + 1.0.1-beta.7 ../../pom.xml diff --git a/api/pom.xml b/api/pom.xml index f3d0726c2..bf1f6c8ab 100644 --- a/api/pom.xml +++ b/api/pom.xml @@ -6,7 +6,7 @@ core co.nilin.opex - 1.0.0-beta.3 + 1.0.1-beta.7 co.nilin.opex.api diff --git a/bc-gateway/bc-gateway-app/pom.xml b/bc-gateway/bc-gateway-app/pom.xml index d9aaf6386..4d7c81e51 100644 --- a/bc-gateway/bc-gateway-app/pom.xml +++ b/bc-gateway/bc-gateway-app/pom.xml @@ -6,7 +6,7 @@ co.nilin.opex.bcgateway bc-gateway - 1.0.0-beta.3 + 1.0.1-beta.7 co.nilin.opex.bcgateway.app diff --git a/bc-gateway/bc-gateway-core/pom.xml b/bc-gateway/bc-gateway-core/pom.xml index 9f767a6ce..e158bcfb1 100644 --- a/bc-gateway/bc-gateway-core/pom.xml +++ b/bc-gateway/bc-gateway-core/pom.xml @@ -6,7 +6,7 @@ co.nilin.opex.bcgateway bc-gateway - 1.0.0-beta.3 + 1.0.1-beta.7 co.nilin.opex.bcgateway.core diff --git a/bc-gateway/bc-gateway-ports/bc-gateway-eventlistener-kafka/pom.xml b/bc-gateway/bc-gateway-ports/bc-gateway-eventlistener-kafka/pom.xml index 891e8f361..3afa9b0f1 100644 --- a/bc-gateway/bc-gateway-ports/bc-gateway-eventlistener-kafka/pom.xml +++ b/bc-gateway/bc-gateway-ports/bc-gateway-eventlistener-kafka/pom.xml @@ -6,7 +6,7 @@ co.nilin.opex.bcgateway bc-gateway - 1.0.0-beta.3 + 1.0.1-beta.7 ../../pom.xml diff --git a/bc-gateway/bc-gateway-ports/bc-gateway-persister-postgres/pom.xml b/bc-gateway/bc-gateway-ports/bc-gateway-persister-postgres/pom.xml index d5bda18ac..6ef20f5c7 100644 --- a/bc-gateway/bc-gateway-ports/bc-gateway-persister-postgres/pom.xml +++ b/bc-gateway/bc-gateway-ports/bc-gateway-persister-postgres/pom.xml @@ -6,7 +6,7 @@ co.nilin.opex.bcgateway bc-gateway - 1.0.0-beta.3 + 1.0.1-beta.7 ../../pom.xml diff --git a/bc-gateway/bc-gateway-ports/bc-gateway-wallet-proxy/pom.xml b/bc-gateway/bc-gateway-ports/bc-gateway-wallet-proxy/pom.xml index e1846e17f..3bc027eee 100644 --- a/bc-gateway/bc-gateway-ports/bc-gateway-wallet-proxy/pom.xml +++ b/bc-gateway/bc-gateway-ports/bc-gateway-wallet-proxy/pom.xml @@ -7,7 +7,7 @@ co.nilin.opex.bcgateway bc-gateway - 1.0.0-beta.3 + 1.0.1-beta.7 ../../pom.xml diff --git a/bc-gateway/pom.xml b/bc-gateway/pom.xml index ad6fa8e84..8fa2138af 100644 --- a/bc-gateway/pom.xml +++ b/bc-gateway/pom.xml @@ -4,7 +4,7 @@ core co.nilin.opex - 1.0.0-beta.3 + 1.0.1-beta.7 4.0.0 diff --git a/captcha/captcha-app/pom.xml b/captcha/captcha-app/pom.xml index bb347a017..42bcbefa0 100644 --- a/captcha/captcha-app/pom.xml +++ b/captcha/captcha-app/pom.xml @@ -7,7 +7,7 @@ captcha co.nilin.opex.captcha - 1.0.0-beta.3 + 1.0.1-beta.7 co.nilin.opex.captcha.app diff --git a/captcha/pom.xml b/captcha/pom.xml index 662409a22..539f6b665 100644 --- a/captcha/pom.xml +++ b/captcha/pom.xml @@ -7,7 +7,7 @@ core co.nilin.opex - 1.0.0-beta.3 + 1.0.1-beta.7 co.nilin.opex.captcha diff --git a/eventlog/eventlog-app/pom.xml b/eventlog/eventlog-app/pom.xml index d619a363b..161a09170 100644 --- a/eventlog/eventlog-app/pom.xml +++ b/eventlog/eventlog-app/pom.xml @@ -6,7 +6,7 @@ co.nilin.opex.eventlog eventlog - 1.0.0-beta.3 + 1.0.1-beta.7 co.nilin.opex.eventlog.app diff --git a/eventlog/eventlog-core/pom.xml b/eventlog/eventlog-core/pom.xml index e914272ed..0b8ad9765 100644 --- a/eventlog/eventlog-core/pom.xml +++ b/eventlog/eventlog-core/pom.xml @@ -6,7 +6,7 @@ co.nilin.opex.eventlog eventlog - 1.0.0-beta.3 + 1.0.1-beta.7 co.nilin.opex.eventlog.core diff --git a/eventlog/eventlog-ports/eventlog-eventlistener-kafka/pom.xml b/eventlog/eventlog-ports/eventlog-eventlistener-kafka/pom.xml index 19ed7de1f..9a6857784 100644 --- a/eventlog/eventlog-ports/eventlog-eventlistener-kafka/pom.xml +++ b/eventlog/eventlog-ports/eventlog-eventlistener-kafka/pom.xml @@ -6,7 +6,7 @@ co.nilin.opex.eventlog eventlog - 1.0.0-beta.3 + 1.0.1-beta.7 ../../pom.xml diff --git a/eventlog/eventlog-ports/eventlog-persister-postgres/pom.xml b/eventlog/eventlog-ports/eventlog-persister-postgres/pom.xml index be2b16d8f..9df95effb 100644 --- a/eventlog/eventlog-ports/eventlog-persister-postgres/pom.xml +++ b/eventlog/eventlog-ports/eventlog-persister-postgres/pom.xml @@ -6,7 +6,7 @@ co.nilin.opex.eventlog eventlog - 1.0.0-beta.3 + 1.0.1-beta.7 ../../pom.xml diff --git a/eventlog/pom.xml b/eventlog/pom.xml index 07443a8eb..8caf08516 100644 --- a/eventlog/pom.xml +++ b/eventlog/pom.xml @@ -6,7 +6,7 @@ core co.nilin.opex - 1.0.0-beta.3 + 1.0.1-beta.7 co.nilin.opex.eventlog diff --git a/market/market-app/pom.xml b/market/market-app/pom.xml index a867c9541..465b7083b 100644 --- a/market/market-app/pom.xml +++ b/market/market-app/pom.xml @@ -6,7 +6,7 @@ co.nilin.opex.market market - 1.0.0-beta.3 + 1.0.1-beta.7 co.nilin.opex.market.app diff --git a/market/market-core/pom.xml b/market/market-core/pom.xml index 146107297..a3793c872 100644 --- a/market/market-core/pom.xml +++ b/market/market-core/pom.xml @@ -6,7 +6,7 @@ co.nilin.opex.market market - 1.0.0-beta.3 + 1.0.1-beta.7 co.nilin.opex.market.core diff --git a/market/market-ports/market-eventlistener-kafka/pom.xml b/market/market-ports/market-eventlistener-kafka/pom.xml index 27abe4901..1106bbefd 100644 --- a/market/market-ports/market-eventlistener-kafka/pom.xml +++ b/market/market-ports/market-eventlistener-kafka/pom.xml @@ -6,7 +6,7 @@ co.nilin.opex.market market - 1.0.0-beta.3 + 1.0.1-beta.7 ../../pom.xml diff --git a/market/market-ports/market-persister-postgres/pom.xml b/market/market-ports/market-persister-postgres/pom.xml index 62f06eade..59add75f5 100644 --- a/market/market-ports/market-persister-postgres/pom.xml +++ b/market/market-ports/market-persister-postgres/pom.xml @@ -6,7 +6,7 @@ co.nilin.opex.market market - 1.0.0-beta.3 + 1.0.1-beta.7 ../../pom.xml diff --git a/market/pom.xml b/market/pom.xml index f81d36072..27a0abd85 100644 --- a/market/pom.xml +++ b/market/pom.xml @@ -6,7 +6,7 @@ core co.nilin.opex - 1.0.0-beta.3 + 1.0.1-beta.7 co.nilin.opex.market diff --git a/matching-engine/matching-engine-app/pom.xml b/matching-engine/matching-engine-app/pom.xml index 0b3c52b29..0ca275569 100644 --- a/matching-engine/matching-engine-app/pom.xml +++ b/matching-engine/matching-engine-app/pom.xml @@ -6,7 +6,7 @@ co.nilin.opex.matching.engine matching-engine - 1.0.0-beta.3 + 1.0.1-beta.7 co.nilin.opex.matching.engine.app diff --git a/matching-engine/matching-engine-core/pom.xml b/matching-engine/matching-engine-core/pom.xml index 94c622dda..25c222bae 100644 --- a/matching-engine/matching-engine-core/pom.xml +++ b/matching-engine/matching-engine-core/pom.xml @@ -6,7 +6,7 @@ co.nilin.opex.matching.engine matching-engine - 1.0.0-beta.3 + 1.0.1-beta.7 co.nilin.opex.matching.engine.core diff --git a/matching-engine/matching-engine-ports/matching-engine-eventlistener-kafka/pom.xml b/matching-engine/matching-engine-ports/matching-engine-eventlistener-kafka/pom.xml index c13b4d4bb..981aaeb33 100644 --- a/matching-engine/matching-engine-ports/matching-engine-eventlistener-kafka/pom.xml +++ b/matching-engine/matching-engine-ports/matching-engine-eventlistener-kafka/pom.xml @@ -6,7 +6,7 @@ co.nilin.opex.matching.engine matching-engine - 1.0.0-beta.3 + 1.0.1-beta.7 ../../pom.xml diff --git a/matching-engine/matching-engine-ports/matching-engine-snapshots-redis/pom.xml b/matching-engine/matching-engine-ports/matching-engine-snapshots-redis/pom.xml index cd3681137..042b22c13 100644 --- a/matching-engine/matching-engine-ports/matching-engine-snapshots-redis/pom.xml +++ b/matching-engine/matching-engine-ports/matching-engine-snapshots-redis/pom.xml @@ -6,7 +6,7 @@ co.nilin.opex.matching.engine matching-engine - 1.0.0-beta.3 + 1.0.1-beta.7 ../../pom.xml diff --git a/matching-engine/matching-engine-ports/matching-engine-submitter-kafka/pom.xml b/matching-engine/matching-engine-ports/matching-engine-submitter-kafka/pom.xml index 4f0b14e8a..b13662c63 100644 --- a/matching-engine/matching-engine-ports/matching-engine-submitter-kafka/pom.xml +++ b/matching-engine/matching-engine-ports/matching-engine-submitter-kafka/pom.xml @@ -6,7 +6,7 @@ co.nilin.opex.matching.engine matching-engine - 1.0.0-beta.3 + 1.0.1-beta.7 ../../pom.xml diff --git a/matching-engine/pom.xml b/matching-engine/pom.xml index dda7ab546..14206642b 100644 --- a/matching-engine/pom.xml +++ b/matching-engine/pom.xml @@ -6,7 +6,7 @@ core co.nilin.opex - 1.0.0-beta.3 + 1.0.1-beta.7 co.nilin.opex.matching.engine diff --git a/matching-gateway/matching-gateway-app/pom.xml b/matching-gateway/matching-gateway-app/pom.xml index 552cfefcd..3fad0fc4e 100644 --- a/matching-gateway/matching-gateway-app/pom.xml +++ b/matching-gateway/matching-gateway-app/pom.xml @@ -6,7 +6,7 @@ co.nilin.opex.matching.gateway matching-gateway - 1.0.0-beta.3 + 1.0.1-beta.7 co.nilin.opex.matching.gateway.app diff --git a/matching-gateway/matching-gateway-port/matching-gateway-submitter-kafka/pom.xml b/matching-gateway/matching-gateway-port/matching-gateway-submitter-kafka/pom.xml index 00cd71bed..9ef385922 100644 --- a/matching-gateway/matching-gateway-port/matching-gateway-submitter-kafka/pom.xml +++ b/matching-gateway/matching-gateway-port/matching-gateway-submitter-kafka/pom.xml @@ -6,7 +6,7 @@ co.nilin.opex.matching.gateway matching-gateway - 1.0.0-beta.3 + 1.0.1-beta.7 ../../pom.xml diff --git a/matching-gateway/pom.xml b/matching-gateway/pom.xml index 9e4d808f4..e131abb43 100644 --- a/matching-gateway/pom.xml +++ b/matching-gateway/pom.xml @@ -6,7 +6,7 @@ core co.nilin.opex - 1.0.0-beta.3 + 1.0.1-beta.7 co.nilin.opex.matching.gateway diff --git a/pom.xml b/pom.xml index ec0f9f998..3aa8af234 100644 --- a/pom.xml +++ b/pom.xml @@ -7,7 +7,7 @@ co.nilin.opex core pom - 1.0.0-beta.3 + 1.0.1-beta.7 11 diff --git a/referral/pom.xml b/referral/pom.xml index 156df69ae..7b8e82223 100644 --- a/referral/pom.xml +++ b/referral/pom.xml @@ -7,7 +7,7 @@ core co.nilin.opex - 1.0.0-beta.3 + 1.0.1-beta.7 co.nilin.opex.referral diff --git a/referral/referral-app/pom.xml b/referral/referral-app/pom.xml index 1c6c7dea3..f776f7026 100644 --- a/referral/referral-app/pom.xml +++ b/referral/referral-app/pom.xml @@ -7,7 +7,7 @@ referral co.nilin.opex.referral - 1.0.0-beta.3 + 1.0.1-beta.7 co.nilin.opex.referral.app diff --git a/referral/referral-core/pom.xml b/referral/referral-core/pom.xml index 21e9b57d3..39459a17b 100644 --- a/referral/referral-core/pom.xml +++ b/referral/referral-core/pom.xml @@ -7,7 +7,7 @@ referral co.nilin.opex.referral - 1.0.0-beta.3 + 1.0.1-beta.7 co.nilin.opex.referral.core diff --git a/referral/referral-ports/referral-api-proxy/pom.xml b/referral/referral-ports/referral-api-proxy/pom.xml index 351a431c0..417f23715 100644 --- a/referral/referral-ports/referral-api-proxy/pom.xml +++ b/referral/referral-ports/referral-api-proxy/pom.xml @@ -7,7 +7,7 @@ referral co.nilin.opex.referral - 1.0.0-beta.3 + 1.0.1-beta.7 ../../pom.xml diff --git a/referral/referral-ports/referral-eventlistener-kafka/pom.xml b/referral/referral-ports/referral-eventlistener-kafka/pom.xml index d71f8d8ad..3bf31f417 100644 --- a/referral/referral-ports/referral-eventlistener-kafka/pom.xml +++ b/referral/referral-ports/referral-eventlistener-kafka/pom.xml @@ -6,7 +6,7 @@ co.nilin.opex.referral referral - 1.0.0-beta.3 + 1.0.1-beta.7 ../../pom.xml diff --git a/referral/referral-ports/referral-persister-postgres/pom.xml b/referral/referral-ports/referral-persister-postgres/pom.xml index b16283529..9634168d0 100644 --- a/referral/referral-ports/referral-persister-postgres/pom.xml +++ b/referral/referral-ports/referral-persister-postgres/pom.xml @@ -7,7 +7,7 @@ referral co.nilin.opex.referral - 1.0.0-beta.3 + 1.0.1-beta.7 ../../pom.xml diff --git a/referral/referral-ports/referral-wallet-proxy/pom.xml b/referral/referral-ports/referral-wallet-proxy/pom.xml index f634dd19c..b4635559a 100644 --- a/referral/referral-ports/referral-wallet-proxy/pom.xml +++ b/referral/referral-ports/referral-wallet-proxy/pom.xml @@ -7,7 +7,7 @@ referral co.nilin.opex.referral - 1.0.0-beta.3 + 1.0.1-beta.7 ../../pom.xml diff --git a/storage/pom.xml b/storage/pom.xml index 48a5a6e17..e397cb848 100644 --- a/storage/pom.xml +++ b/storage/pom.xml @@ -6,7 +6,7 @@ core co.nilin.opex - 1.0.0-beta.3 + 1.0.1-beta.7 co.nilin.opex.storage diff --git a/storage/storage-app/pom.xml b/storage/storage-app/pom.xml index be4dbbfdd..23468cde0 100644 --- a/storage/storage-app/pom.xml +++ b/storage/storage-app/pom.xml @@ -6,7 +6,7 @@ co.nilin.opex.storage storage - 1.0.0-beta.3 + 1.0.1-beta.7 co.nilin.opex.storage.app diff --git a/user-management/keycloak-gateway/pom.xml b/user-management/keycloak-gateway/pom.xml index f86de02b1..63f9451ce 100644 --- a/user-management/keycloak-gateway/pom.xml +++ b/user-management/keycloak-gateway/pom.xml @@ -6,7 +6,7 @@ co.nilin.opex.auth user-management - 1.0.0-beta.3 + 1.0.1-beta.7 co.nilin.opex.auth.gateway diff --git a/user-management/pom.xml b/user-management/pom.xml index 1d8c5530e..b7786442c 100644 --- a/user-management/pom.xml +++ b/user-management/pom.xml @@ -6,7 +6,7 @@ core co.nilin.opex - 1.0.0-beta.3 + 1.0.1-beta.7 co.nilin.opex.auth diff --git a/utility/error-handler/pom.xml b/utility/error-handler/pom.xml index 626c0971e..980344bb2 100644 --- a/utility/error-handler/pom.xml +++ b/utility/error-handler/pom.xml @@ -6,7 +6,7 @@ co.nilin.opex.utility utility - 1.0.0-beta.3 + 1.0.1-beta.7 co.nilin.opex.utility.error diff --git a/utility/interceptors/pom.xml b/utility/interceptors/pom.xml index 181eed6b7..d614475c0 100644 --- a/utility/interceptors/pom.xml +++ b/utility/interceptors/pom.xml @@ -6,7 +6,7 @@ co.nilin.opex.utility utility - 1.0.0-beta.3 + 1.0.1-beta.7 co.nilin.opex.utility.interceptors diff --git a/utility/logging-handler/pom.xml b/utility/logging-handler/pom.xml index bd9629bd1..0c0f0a7ee 100644 --- a/utility/logging-handler/pom.xml +++ b/utility/logging-handler/pom.xml @@ -6,7 +6,7 @@ co.nilin.opex.utility utility - 1.0.0-beta.3 + 1.0.1-beta.7 co.nilin.opex.utility.log diff --git a/utility/pom.xml b/utility/pom.xml index db80c6b8b..f6018bbcb 100644 --- a/utility/pom.xml +++ b/utility/pom.xml @@ -7,7 +7,7 @@ core co.nilin.opex - 1.0.0-beta.3 + 1.0.1-beta.7 co.nilin.opex.utility diff --git a/utility/preferences/pom.xml b/utility/preferences/pom.xml index 9a626836d..54a64b8e9 100644 --- a/utility/preferences/pom.xml +++ b/utility/preferences/pom.xml @@ -6,7 +6,7 @@ utility co.nilin.opex.utility - 1.0.0-beta.3 + 1.0.1-beta.7 co.nilin.opex.utility.preferences diff --git a/wallet/pom.xml b/wallet/pom.xml index c298959b0..3816eadf8 100644 --- a/wallet/pom.xml +++ b/wallet/pom.xml @@ -6,7 +6,7 @@ core co.nilin.opex - 1.0.0-beta.3 + 1.0.1-beta.7 co.nilin.opex.wallet diff --git a/wallet/wallet-app/pom.xml b/wallet/wallet-app/pom.xml index 73cbb80c9..c363f4d04 100644 --- a/wallet/wallet-app/pom.xml +++ b/wallet/wallet-app/pom.xml @@ -6,7 +6,7 @@ co.nilin.opex.wallet wallet - 1.0.0-beta.3 + 1.0.1-beta.7 co.nilin.opex.wallet.app diff --git a/wallet/wallet-core/pom.xml b/wallet/wallet-core/pom.xml index 0fc86ba01..898b4725c 100644 --- a/wallet/wallet-core/pom.xml +++ b/wallet/wallet-core/pom.xml @@ -6,7 +6,7 @@ co.nilin.opex.wallet wallet - 1.0.0-beta.3 + 1.0.1-beta.7 co.nilin.opex.wallet.core diff --git a/wallet/wallet-ports/wallet-eventlistener-kafka/pom.xml b/wallet/wallet-ports/wallet-eventlistener-kafka/pom.xml index e569fb821..c2ba42407 100644 --- a/wallet/wallet-ports/wallet-eventlistener-kafka/pom.xml +++ b/wallet/wallet-ports/wallet-eventlistener-kafka/pom.xml @@ -6,7 +6,7 @@ co.nilin.opex.wallet wallet - 1.0.0-beta.3 + 1.0.1-beta.7 ../../pom.xml diff --git a/wallet/wallet-ports/wallet-persister-postgres/pom.xml b/wallet/wallet-ports/wallet-persister-postgres/pom.xml index aec02a1aa..24d54ce8a 100644 --- a/wallet/wallet-ports/wallet-persister-postgres/pom.xml +++ b/wallet/wallet-ports/wallet-persister-postgres/pom.xml @@ -6,7 +6,7 @@ co.nilin.opex.wallet wallet - 1.0.0-beta.3 + 1.0.1-beta.7 ../../pom.xml diff --git a/websocket/pom.xml b/websocket/pom.xml index aa93a4ac8..47f58a347 100644 --- a/websocket/pom.xml +++ b/websocket/pom.xml @@ -7,7 +7,7 @@ core co.nilin.opex - 1.0.0-beta.3 + 1.0.1-beta.7 co.nilin.opex.websocket diff --git a/websocket/websocket-app/pom.xml b/websocket/websocket-app/pom.xml index 878f99149..e19ce816a 100644 --- a/websocket/websocket-app/pom.xml +++ b/websocket/websocket-app/pom.xml @@ -6,7 +6,7 @@ co.nilin.opex.websocket websocket - 1.0.0-beta.3 + 1.0.1-beta.7 co.nilin.opex.websocket.app diff --git a/websocket/websocket-core/pom.xml b/websocket/websocket-core/pom.xml index e57dc6900..2191e50c1 100644 --- a/websocket/websocket-core/pom.xml +++ b/websocket/websocket-core/pom.xml @@ -7,7 +7,7 @@ co.nilin.opex.websocket websocket - 1.0.0-beta.3 + 1.0.1-beta.7 co.nilin.opex.websocket.core diff --git a/websocket/websocket-ports/websocket-eventlistener-kafka/pom.xml b/websocket/websocket-ports/websocket-eventlistener-kafka/pom.xml index 0ae3d6412..89454d268 100644 --- a/websocket/websocket-ports/websocket-eventlistener-kafka/pom.xml +++ b/websocket/websocket-ports/websocket-eventlistener-kafka/pom.xml @@ -6,7 +6,7 @@ co.nilin.opex.websocket websocket - 1.0.0-beta.3 + 1.0.1-beta.7 ../../pom.xml diff --git a/websocket/websocket-ports/websocket-persister-postgres/pom.xml b/websocket/websocket-ports/websocket-persister-postgres/pom.xml index 5dd103d7b..06a4d676a 100644 --- a/websocket/websocket-ports/websocket-persister-postgres/pom.xml +++ b/websocket/websocket-ports/websocket-persister-postgres/pom.xml @@ -6,7 +6,7 @@ co.nilin.opex.websocket websocket - 1.0.0-beta.3 + 1.0.1-beta.7 ../../pom.xml