From 5b503f679285f27f082d6a49f60a825f6acafe19 Mon Sep 17 00:00:00 2001 From: metalicn20 Date: Sat, 2 Oct 2021 16:43:41 +0330 Subject: [PATCH] Implement chain scan request --- .../bc-gateway-ports/bc-chain-proxy/pom.xml | 4 + .../impl/ChainEndpointProxyImpl.kt | 74 ++++++++++++++++++- .../impl/ChainEndpointProxyFinderImpl.kt | 6 +- 3 files changed, 79 insertions(+), 5 deletions(-) diff --git a/BlockchainGateway/bc-gateway-ports/bc-chain-proxy/pom.xml b/BlockchainGateway/bc-gateway-ports/bc-chain-proxy/pom.xml index ca4affcdc..8924485f2 100644 --- a/BlockchainGateway/bc-gateway-ports/bc-chain-proxy/pom.xml +++ b/BlockchainGateway/bc-gateway-ports/bc-chain-proxy/pom.xml @@ -51,6 +51,10 @@ bc-gateway-core ${bc-gateway.version} + + org.springframework + spring-webflux + diff --git a/BlockchainGateway/bc-gateway-ports/bc-chain-proxy/src/main/kotlin/co.nilin.opex.port.bcgateway.chainproxy/impl/ChainEndpointProxyImpl.kt b/BlockchainGateway/bc-gateway-ports/bc-chain-proxy/src/main/kotlin/co.nilin.opex.port.bcgateway.chainproxy/impl/ChainEndpointProxyImpl.kt index bf9e4e217..c214084dd 100644 --- a/BlockchainGateway/bc-gateway-ports/bc-chain-proxy/src/main/kotlin/co.nilin.opex.port.bcgateway.chainproxy/impl/ChainEndpointProxyImpl.kt +++ b/BlockchainGateway/bc-gateway-ports/bc-chain-proxy/src/main/kotlin/co.nilin.opex.port.bcgateway.chainproxy/impl/ChainEndpointProxyImpl.kt @@ -1,12 +1,80 @@ package co.nilin.opex.port.bcgateway.chainproxy.impl import co.nilin.opex.bcgateway.core.model.ChainSyncRecord +import co.nilin.opex.bcgateway.core.model.Deposit import co.nilin.opex.bcgateway.core.model.Endpoint import co.nilin.opex.bcgateway.core.spi.ChainEndpointProxy -import org.springframework.stereotype.Component +import kotlinx.coroutines.reactive.awaitFirstOrElse +import org.springframework.core.ParameterizedTypeReference +import org.springframework.web.reactive.function.client.WebClient +import org.springframework.web.reactive.function.client.WebClientResponseException +import reactor.core.publisher.Mono +import java.math.BigDecimal +import java.net.URI +import java.time.LocalDateTime + +inline fun typeRef(): ParameterizedTypeReference = object : ParameterizedTypeReference() {} + +class ChainEndpointProxyImpl( + private val chain: String, + private val endpoints: List, + private val webClient: WebClient +) : + ChainEndpointProxy { + data class TransfersRequest( + val startBlock: Long?, + val endBlock: Long?, + val addresses: List? + ) + + data class Transfer( + var txHash: String, + var from: String, + var to: String, + var isTokenTransfer: Boolean, + var token: String? = null, + var amount: BigDecimal + ) + + private suspend fun requestTransferList(baseUrl: String, request: TransfersRequest): List { + return webClient.post() + .uri(URI.create("$baseUrl/transfers")) + .header("Content-Type", "application/json") + .body(Mono.just(request), TransfersRequest::class.java) + .retrieve() + .onStatus({ t -> t.isError }, { it.createException() }) + .bodyToFlux(typeRef()) + .log().map { Deposit(null, it.to, null, it.amount, chain, it.isTokenTransfer, it.token) } + .collectList() + .awaitFirstOrElse { emptyList() } + } + + private suspend fun roundRobin(i: Int, request: TransfersRequest): ChainSyncRecord { + return try { + val deposits = + requestTransferList( + endpoints[i].url, + request + ) + ChainSyncRecord(chain, LocalDateTime.now(), endpoints[i], request.endBlock, true, null, deposits) + } catch (error: WebClientResponseException) { + if (i < endpoints.size - 1) { + roundRobin(i + 1, request) + } else { + ChainSyncRecord( + chain, + LocalDateTime.now(), + endpoints[i], + request.endBlock, + false, + error.message, + emptyList() + ) + } + } + } -class ChainEndpointProxyImpl(private val endpoints: List) : ChainEndpointProxy { override suspend fun syncTransfers(filter: ChainEndpointProxy.DepositFilter): ChainSyncRecord { - TODO("Not yet implemented") + return roundRobin(0, TransfersRequest(filter.startBlock, filter.endBlock, filter.tokenAddresses)) } } diff --git a/BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/impl/ChainEndpointProxyFinderImpl.kt b/BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/impl/ChainEndpointProxyFinderImpl.kt index b0873c9f9..2e15c726a 100644 --- a/BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/impl/ChainEndpointProxyFinderImpl.kt +++ b/BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/impl/ChainEndpointProxyFinderImpl.kt @@ -8,11 +8,13 @@ import co.nilin.opex.port.bcgateway.postgres.dao.ChainRepository import kotlinx.coroutines.flow.map import kotlinx.coroutines.flow.toList import org.springframework.stereotype.Component +import org.springframework.web.reactive.function.client.WebClient @Component -class ChainEndpointProxyFinderImpl(private val chainRepository: ChainRepository) : ChainEndpointProxyFinder { +class ChainEndpointProxyFinderImpl(private val chainRepository: ChainRepository, private val webClient: WebClient) : + ChainEndpointProxyFinder { override suspend fun findChainEndpointProxy(chainName: String): ChainEndpointProxy { val endpoints = chainRepository.findEndpointsByName(chainName).map { Endpoint(it.url) }.toList() - return ChainEndpointProxyImpl(endpoints) + return ChainEndpointProxyImpl(chainName, endpoints,webClient) } }