Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package co.nilin.opex.accountant.core.inout

import java.math.BigDecimal

data class TransferRequest(
val amount: BigDecimal,
val symbol: String,
val senderUuid: String,
val senderWalletType: String,
val receiverUuid: String,
val receiverWalletType: String,
val transferRef: String?,
val description: String?
)
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package co.nilin.opex.accountant.core.service

import co.nilin.opex.accountant.core.api.FinancialActionJobManager
import co.nilin.opex.accountant.core.inout.TransferRequest
import co.nilin.opex.accountant.core.model.FinancialAction
import co.nilin.opex.accountant.core.model.FinancialActionStatus
import co.nilin.opex.accountant.core.spi.FinancialActionLoader
import co.nilin.opex.accountant.core.spi.FinancialActionPersister
Expand All @@ -13,31 +15,44 @@ class FinancialActionJobManagerImpl(
private val walletProxy: WalletProxy
) : FinancialActionJobManager {

private val retryLimit = 10
private val log = LoggerFactory.getLogger(FinancialActionJobManagerImpl::class.java)
private val logger = LoggerFactory.getLogger(FinancialActionJobManagerImpl::class.java)

override suspend fun processFinancialActions(offset: Long, size: Long) {
val factions = financialActionLoader.loadUnprocessed(offset, size)
factions.forEach {
try {
walletProxy.transfer(
val flatten = sortAndFlattenFA(factions)
logger.info("Loaded ${flatten.size} factions: ${flatten.map { it.id }}")
if (factions.isEmpty())
return

try {
val requests = factions.map {
TransferRequest(
it.amount,
it.symbol,
it.senderWalletType,
it.sender,
it.receiverWalletType,
it.senderWalletType,
it.receiver,
it.amount,
it.eventType + it.pointer,
null
)
financialActionPersister.updateStatus(it, FinancialActionStatus.PROCESSED)
} catch (e: Exception) {
log.error("financial job error", e)
financialActionPersister.updateStatus(
it,
if (it.retryCount >= retryLimit) FinancialActionStatus.ERROR else FinancialActionStatus.CREATED
it.receiverWalletType,
null,
it.eventType + it.pointer
)
}
walletProxy.batchTransfer(requests)
financialActionPersister.updateBatchStatus(factions, FinancialActionStatus.PROCESSED)
} catch (e: Exception) {
logger.error("financial job error", e)
}
}

fun sortAndFlattenFA(list: List<FinancialAction>): Collection<FinancialAction> {
val result = arrayListOf<FinancialAction>()

fun extractParent(fa: FinancialAction) {
if (fa.parent != null)
extractParent(fa.parent)
result.add(fa)
}
list.forEach { extractParent(it) }
return result.distinctBy { it.id }
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,6 @@ interface FinancialActionPersister {
suspend fun persist(financialActions: List<FinancialAction>): List<FinancialAction>

suspend fun updateStatus(financialAction: FinancialAction, status: FinancialActionStatus)

suspend fun updateBatchStatus(financialAction: List<FinancialAction>, status: FinancialActionStatus)
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package co.nilin.opex.accountant.core.spi

import co.nilin.opex.accountant.core.inout.TransferRequest
import java.math.BigDecimal

interface WalletProxy {
Expand All @@ -15,5 +16,7 @@ interface WalletProxy {
transferRef: String?
)

suspend fun batchTransfer(transfers: List<TransferRequest>)

suspend fun canFulfil(symbol: String, walletType: String, uuid: String, amount: BigDecimal): Boolean
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package co.nilin.opex.accountant.core.service

import co.nilin.opex.accountant.core.model.FinancialActionStatus
import co.nilin.opex.accountant.core.model.FinancialAction
import co.nilin.opex.accountant.core.spi.FinancialActionLoader
import co.nilin.opex.accountant.core.spi.FinancialActionPersister
import co.nilin.opex.accountant.core.spi.WalletProxy
Expand All @@ -9,6 +9,9 @@ import io.mockk.coVerify
import io.mockk.mockk
import kotlinx.coroutines.runBlocking
import org.junit.jupiter.api.Test
import java.math.BigDecimal
import java.time.LocalDateTime
import org.assertj.core.api.Assertions.assertThat

class FAJobManagerImplTest {

Expand All @@ -20,93 +23,36 @@ class FAJobManagerImplTest {

init {
coEvery { financialActionLoader.loadUnprocessed(any(), any()) } returns listOf(Valid.fa, Valid.fa)
coEvery { financialActionPersister.updateStatus(any(), any()) } returns Unit
}

@Test
fun given2FALoaded_whenProcessing_thenVerifyThatTransferProxyCalled2Times() = runBlocking {
coEvery { walletProxy.transfer(any(), any(), any(), any(), any(), any(), any(), any()) } returns Unit
sut.processFinancialActions(0, 2)
with(Valid.fa) {
coVerify(exactly = 2) {
walletProxy.transfer(
eq(symbol),
eq(senderWalletType),
eq(sender),
eq(receiverWalletType),
eq(receiver),
eq(amount),
eq(eventType + pointer),
any()
)
}
coVerify(exactly = 2) {
financialActionPersister.updateStatus(
eq(this@with),
eq(FinancialActionStatus.PROCESSED)
)
}
}
coEvery { financialActionPersister.updateBatchStatus(any(), any()) } returns Unit
}

@Test
fun given2FALoaded_whenProcessingFailed_thenUpdateStatusCalledRegardless() = runBlocking {
coEvery {
walletProxy.transfer(any(), any(), any(), any(), any(), any(), any(), any())
walletProxy.batchTransfer(any())
} throws IllegalStateException()

sut.processFinancialActions(0, 2)
with(Valid.fa) {
coVerify(exactly = 2) {
walletProxy.transfer(
eq(symbol),
eq(senderWalletType),
eq(sender),
eq(receiverWalletType),
eq(receiver),
eq(amount),
eq(eventType + pointer),
any()
)
}
coVerify(exactly = 2) {
financialActionPersister.updateStatus(
eq(this@with),
eq(FinancialActionStatus.CREATED)
)
}
coVerify(exactly = 1) {
walletProxy.batchTransfer(any())
}
}

@Test
fun given2FALoaded_whenProcessingFailedAndRetryCountExceeded_thenUpdateStatusCalledRegardless() = runBlocking {
coEvery {
walletProxy.transfer(any(), any(), any(), any(), any(), any(), any(), any())
} throws IllegalStateException()

coEvery { financialActionLoader.loadUnprocessed(any(), any()) } returns listOf(Valid.faHighRetry)

sut.processFinancialActions(0, 1)
with(Valid.faHighRetry) {
coVerify(exactly = 1) {
walletProxy.transfer(
eq(symbol),
eq(senderWalletType),
eq(sender),
eq(receiverWalletType),
eq(receiver),
eq(amount),
eq(eventType + pointer),
any()
)
}
coVerify(exactly = 1) {
financialActionPersister.updateStatus(
eq(this@with),
eq(FinancialActionStatus.ERROR)
)
}
}
fun givenFALoaded_validateParentsAreFirstInLine(): Unit = runBlocking {
val fa1 = FinancialAction(null, "", "", "", BigDecimal.ZERO, "", "", "", "", LocalDateTime.now(), id = 1)
val fa2 = FinancialAction(fa1, "", "", "", BigDecimal.ZERO, "", "", "", "", LocalDateTime.now(), id = 2)
val fa3 = FinancialAction(fa1, "", "", "", BigDecimal.ZERO, "", "", "", "", LocalDateTime.now(), id = 3)
val fa4 = FinancialAction(fa3, "", "", "", BigDecimal.ZERO, "", "", "", "", LocalDateTime.now(), id = 4)
val fa5 = FinancialAction(null, "", "", "", BigDecimal.ZERO, "", "", "", "", LocalDateTime.now(), id = 5)
val list = arrayListOf(fa5, fa4, fa3, fa2, fa1)

val flatten = sut.sortAndFlattenFA(list)

assertThat(flatten.indexOf(fa1)).isLessThan(flatten.indexOf(fa2))
assertThat(flatten.indexOf(fa1)).isLessThan(flatten.indexOf(fa3))
assertThat(flatten.indexOf(fa1)).isLessThan(flatten.indexOf(fa4))
assertThat(flatten.indexOf(fa3)).isLessThan(flatten.indexOf(fa4))
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
<artifactId>spring-boot-starter-data-r2dbc</artifactId>
</dependency>
<dependency>
<groupId>io.r2dbc</groupId>
<groupId>org.postgresql</groupId>
<artifactId>r2dbc-postgresql</artifactId>
<scope>runtime</scope>
</dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,4 +37,7 @@ interface FinancialActionRepository : ReactiveCrudRepository<FinancialActionMode

@Query("update fi_actions set status = :status, retry_count = retry_count + 1 where id = :id")
fun updateStatusAndIncreaseRetry(@Param("id") id: Long, @Param("status") status: FinancialActionStatus): Mono<Int>

@Query("update fi_actions set status = :status where id in (:ids)")
fun updateStatus(ids: List<Long>, status: FinancialActionStatus): Mono<Int>
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,4 +37,8 @@ class FinancialActionPersisterImpl(private val financialActionRepository: Financ
override suspend fun updateStatus(financialAction: FinancialAction, status: FinancialActionStatus) {
financialActionRepository.updateStatusAndIncreaseRetry(financialAction.id!!, status).awaitFirstOrNull()
}

override suspend fun updateBatchStatus(financialAction: List<FinancialAction>, status: FinancialActionStatus) {
financialActionRepository.updateStatus(financialAction.mapNotNull { it.id }, status).awaitFirstOrNull()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@
<artifactId>spring-boot-starter-data-r2dbc</artifactId>
</dependency>
<dependency>
<groupId>io.r2dbc</groupId>
<groupId>org.postgresql</groupId>
<artifactId>r2dbc-postgresql</artifactId>
<scope>runtime</scope>
</dependency>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,18 @@
package co.nilin.opex.accountant.ports.walletproxy.proxy

import co.nilin.opex.accountant.core.inout.TransferRequest
import co.nilin.opex.accountant.core.spi.WalletProxy
import co.nilin.opex.accountant.ports.walletproxy.data.BooleanResponse
import co.nilin.opex.accountant.ports.walletproxy.data.TransferResult
import kotlinx.coroutines.reactive.awaitFirst
import kotlinx.coroutines.reactive.awaitFirstOrNull
import org.springframework.beans.factory.annotation.Value
import org.springframework.http.MediaType
import org.springframework.stereotype.Component
import org.springframework.web.reactive.function.client.WebClient
import org.springframework.web.reactive.function.client.body
import org.springframework.web.reactive.function.client.bodyToMono
import reactor.core.publisher.Mono
import java.math.BigDecimal

@Component
Expand All @@ -28,14 +33,26 @@ class WalletProxyImpl(
) {
webClient.post()
.uri("$walletBaseUrl/transfer/${amount}_$symbol/from/${senderUuid}_$senderWalletType/to/${receiverUuid}_$receiverWalletType")
.header("Content-Type", "application/json")
.contentType(MediaType.APPLICATION_JSON)
.retrieve()
.onStatus({ t -> t.isError }, { it.createException() })
.bodyToMono<TransferResult>()
.log()
.awaitFirst()
}

override suspend fun batchTransfer(transfers: List<TransferRequest>) {
webClient.post()
.uri("$walletBaseUrl/transfer/batch")
.contentType(MediaType.APPLICATION_JSON)
.body(Mono.just(transfers))
.retrieve()
.onStatus({ t -> t.isError }, { it.createException() })
.bodyToMono<TransferResult>()
.log()
.awaitFirstOrNull()
}

override suspend fun canFulfil(symbol: String, walletType: String, uuid: String, amount: BigDecimal): Boolean {
return webClient.get()
.uri("$walletBaseUrl/$uuid/wallet_type/$walletType/can_withdraw/${amount}_$symbol")
Expand Down
2 changes: 1 addition & 1 deletion api/api-ports/api-binance-rest/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@
<artifactId>spring-boot-starter-data-r2dbc</artifactId>
</dependency>
<dependency>
<groupId>io.r2dbc</groupId>
<groupId>org.postgresql</groupId>
<artifactId>r2dbc-postgresql</artifactId>
<scope>runtime</scope>
</dependency>
Expand Down
Loading