Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 47 additions & 0 deletions accountant/accountant-app/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,53 @@
<groupId>co.nilin.opex.utility.preferences</groupId>
<artifactId>preferences</artifactId>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>testcontainers</artifactId>
<version>1.18.0</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>postgresql</artifactId>
<version>1.18.0</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>r2dbc</artifactId>
<version>1.18.0</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream</artifactId>
<version>3.2.6</version>
<type>test-jar</type>
<scope>test</scope>
<classifier>test-binder</classifier>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<version>2.9.4</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>kafka</artifactId>
<version>1.18.0</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito.kotlin</groupId>
<artifactId>mockito-kotlin</artifactId>
</dependency>
<dependency>
<groupId>io.mockk</groupId>
<artifactId>mockk</artifactId>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,12 @@ import io.micrometer.core.instrument.MeterRegistry
import org.springframework.boot.actuate.health.HealthComponent
import org.springframework.boot.actuate.health.HealthEndpoint
import org.springframework.boot.actuate.health.SystemHealth
import org.springframework.context.annotation.Profile
import org.springframework.scheduling.annotation.Scheduled
import org.springframework.stereotype.Component

@Component
@Profile("!test")
class PrometheusHealthExtension(
private val registry: MeterRegistry,
private val endpoint: HealthEndpoint
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package co.nilin.opex.accountant.app

import org.junit.jupiter.api.Test
import org.springframework.boot.test.context.SpringBootTest
import org.springframework.cloud.stream.binder.test.TestChannelBinderConfiguration
import org.springframework.context.annotation.Import
import org.springframework.test.context.ActiveProfiles

@SpringBootTest
@ActiveProfiles("test")
@Import(TestChannelBinderConfiguration::class)
class AccountantAppTest {
@Test
fun contextLoad() {

}
}
50 changes: 50 additions & 0 deletions accountant/accountant-app/src/test/resources/application.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
server.port: 8080
logging:
level:
co.nilin: DEBUG
reactor.netty.http.client: DEBUG
spring:
application:
name: opex-accountant
main:
allow-bean-definition-overriding: true
allow-circular-references: true
kafka:
bootstrap-servers: ${KAFKA_IP_PORT:localhost:9092}
consumer:
group-id: accountant
r2dbc:
url: r2dbc:tc:postgresql:///accountant?TC_IMAGE_TAG=9.6.8
initialization-mode: always
cloud:
bootstrap:
enabled: true
discovery:
enabled: false
consul:
enabled: false
config:
enabled: false
vault:
enabled: false

management:
health:
vault:
enabled: false
endpoints:
web:
base-path: /actuator
exposure:
include: ["health", "metrics"]
endpoint:
health:
show-details: always
metrics:
enabled: true
prometheus:
enabled: false
app:
address: 1
wallet:
url: ""
Original file line number Diff line number Diff line change
@@ -1,12 +1,10 @@
package co.nilin.opex.accountant.core.service

import co.nilin.opex.accountant.core.api.FinancialActionJobManager
import co.nilin.opex.accountant.core.inout.TransferRequest
import co.nilin.opex.accountant.core.model.FinancialAction
import co.nilin.opex.accountant.core.model.FinancialActionStatus
import co.nilin.opex.accountant.core.spi.FinancialActionLoader
import co.nilin.opex.accountant.core.spi.FinancialActionPersister
import co.nilin.opex.accountant.core.spi.FinancialActionPublisher
import co.nilin.opex.accountant.core.spi.WalletProxy
import org.slf4j.LoggerFactory

Expand All @@ -18,10 +16,17 @@ class FinancialActionJobManagerImpl(

private val logger = LoggerFactory.getLogger(FinancialActionJobManagerImpl::class.java)

override suspend fun processFinancialActions(offset: Long, size: Long) {
val factions = financialActionLoader.loadUnprocessed(offset, size)
override suspend fun processFinancialActions(offset: Long, size: Long){
val factions = financialActionLoader.loadReadyToProcess(offset, size)
factions.forEach {
try {
if (it.parent != null) {
val reloadParent = financialActionLoader.loadFinancialAction(it.parent.id)!!
if (reloadParent.status != FinancialActionStatus.PROCESSED) {
logger.warn("financial job {} skipped because of parent status {}", it, reloadParent)
return@forEach
}
}
walletProxy.transfer(
it.symbol,
it.senderWalletType,
Expand All @@ -32,10 +37,11 @@ class FinancialActionJobManagerImpl(
it.eventType + it.pointer,
null
)
financialActionPersister.updateStatus(it, FinancialActionStatus.PROCESSED)
financialActionPersister.updateStatusNewTx(it, FinancialActionStatus.PROCESSED)

} catch (e: Exception) {
logger.error("financial job error", e)
financialActionPersister.updateStatus(it, FinancialActionStatus.ERROR)
financialActionPersister.updateStatusNewTx(it, FinancialActionStatus.ERROR)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,6 @@ interface FinancialActionLoader {
suspend fun findLast(userUuid: String, ouid: String): FinancialAction?
suspend fun loadUnprocessed(offset: Long, size: Long): List<FinancialAction>
suspend fun countUnprocessed(userUuid: String, symbol: String, eventType: String): Long
suspend fun loadReadyToProcess(offset: Long, size: Long): List<FinancialAction>
suspend fun loadFinancialAction(id: Long?): FinancialAction?
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,5 @@ interface FinancialActionPersister {
suspend fun updateStatus(faUuid: String, status: FinancialActionStatus)

suspend fun updateBatchStatus(financialAction: List<FinancialAction>, status: FinancialActionStatus)
suspend fun updateStatusNewTx(financialAction: FinancialAction, status: FinancialActionStatus)
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,16 @@ interface FinancialActionRepository : ReactiveCrudRepository<FinancialActionMode
paging: Pageable
): Flow<FinancialActionModel>

@Query("select count(1) from fi_actions fi where fi.sender = :uuid and fi.symbol = :symbol and fi.event_type = :eventType and fi.status = :status")
fun findByUuidAndSymbolAndEventTypeAndStatus(
@Query("select count(1) from fi_actions fi where fi.sender = :uuid and fi.symbol = :symbol and fi.event_type = :eventType and fi.status != :status")
fun countByUuidAndSymbolAndEventTypeAndStatusNot(
@Param("uuid") uuid: String,
@Param("symbol") symbol: String,
@Param("eventType") eventType: String,
@Param("status") financialActionStatus: FinancialActionStatus
): Mono<BigDecimal>

@Query("select * from fi_actions fi where status = :status")
fun findByStatus(@Param("status") status: String, paging: Pageable): Flow<FinancialActionModel>
@Query("select * from fi_actions fi where status != :status")
fun findByStatusNot(@Param("status") status: String, paging: Pageable): Flow<FinancialActionModel>

@Query("update fi_actions set status = :status where id = :id")
fun updateStatus(@Param("id") id: Long, @Param("status") status: FinancialActionStatus): Mono<Int>
Expand All @@ -40,4 +40,9 @@ interface FinancialActionRepository : ReactiveCrudRepository<FinancialActionMode

@Query("update fi_actions set status = :status where id in (:ids)")
fun updateBatchStatus(ids: List<Long>, status: FinancialActionStatus): Mono<Int>
@Query("select * from fi_actions fi where status = 'CREATED' " +
"and ( parent_id is null " +
" or 'ERROR' != (select pfi.status from fi_actions pfi where pfi.id = fi.parent_id)" +
")")
fun findReadyToProcess(of: Pageable): Flow<FinancialActionModel>
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,15 @@ import java.math.BigDecimal
class FinancialActionLoaderImpl(val financialActionRepository: FinancialActionRepository) : FinancialActionLoader {

override suspend fun loadUnprocessed(offset: Long, size: Long): List<FinancialAction> {
return financialActionRepository.findByStatus(
FinancialActionStatus.CREATED.name,
return financialActionRepository.findByStatusNot(
FinancialActionStatus.PROCESSED.name,
PageRequest.of(offset.toInt(), size.toInt(), Sort.by(Sort.Direction.ASC, "createDate"))
).map { loadFinancialAction(it.id)!! }
.toList()
}

override suspend fun loadReadyToProcess(offset: Long, size: Long): List<FinancialAction> {
return financialActionRepository.findReadyToProcess(
PageRequest.of(offset.toInt(), size.toInt(), Sort.by(Sort.Direction.ASC, "createDate"))
).map { loadFinancialAction(it.id)!! }
.toList()
Expand All @@ -33,15 +40,15 @@ class FinancialActionLoaderImpl(val financialActionRepository: FinancialActionRe
}

override suspend fun countUnprocessed(userUuid: String, symbol: String, eventType: String): Long {
return financialActionRepository.findByUuidAndSymbolAndEventTypeAndStatus(
return financialActionRepository.countByUuidAndSymbolAndEventTypeAndStatusNot(
userUuid,
symbol,
eventType,
FinancialActionStatus.CREATED
FinancialActionStatus.PROCESSED
).awaitFirstOrElse { BigDecimal.ZERO }.toLong()
}

private suspend fun loadFinancialAction(id: Long?): FinancialAction? {
override suspend fun loadFinancialAction(id: Long?): FinancialAction? {
if (id != null) {
val fim = financialActionRepository.findById(id).awaitFirst()
return FinancialAction(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import kotlinx.coroutines.reactive.awaitFirstOrNull
import kotlinx.coroutines.reactor.awaitSingle
import kotlinx.coroutines.reactor.awaitSingleOrNull
import org.springframework.stereotype.Component
import org.springframework.transaction.annotation.Propagation
import org.springframework.transaction.annotation.Transactional

@Component
class FinancialActionPersisterImpl(private val financialActionRepository: FinancialActionRepository) :
Expand Down Expand Up @@ -60,9 +62,14 @@ class FinancialActionPersisterImpl(private val financialActionRepository: Financ
).awaitSingle()
}


override suspend fun updateStatus(financialAction: FinancialAction, status: FinancialActionStatus) {
financialActionRepository.updateStatus(financialAction.id!!, status).awaitSingleOrNull()
}
@Transactional(propagation = Propagation.REQUIRES_NEW)
override suspend fun updateStatusNewTx(financialAction: FinancialAction, status: FinancialActionStatus) {
financialActionRepository.updateStatus(financialAction.id!!, status).awaitSingleOrNull()
}

override suspend fun updateStatus(faUuid: String, status: FinancialActionStatus) {
financialActionRepository.updateStatus(faUuid, status).awaitSingleOrNull()
Expand Down
2 changes: 1 addition & 1 deletion wallet/wallet-app/src/test/resources/application.yml
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ spring:
auto-offset-reset: earliest
group-id: wallet
r2dbc:
url: r2dbc:tc:postgresql:///databasename?TC_IMAGE_TAG=9.6.8
url: r2dbc:tc:postgresql:///wallet?TC_IMAGE_TAG=9.6.8
initialization-mode: always
cloud:
bootstrap:
Expand Down