diff --git a/BlockchainGateway/bc-gateway-ports/bc-chain-proxy/pom.xml b/BlockchainGateway/bc-gateway-ports/bc-chain-proxy/pom.xml
index ca4affcdc..8924485f2 100644
--- a/BlockchainGateway/bc-gateway-ports/bc-chain-proxy/pom.xml
+++ b/BlockchainGateway/bc-gateway-ports/bc-chain-proxy/pom.xml
@@ -51,6 +51,10 @@
bc-gateway-core
${bc-gateway.version}
+
+ org.springframework
+ spring-webflux
+
diff --git a/BlockchainGateway/bc-gateway-ports/bc-chain-proxy/src/main/kotlin/co.nilin.opex.port.bcgateway.chainproxy/impl/ChainEndpointProxyImpl.kt b/BlockchainGateway/bc-gateway-ports/bc-chain-proxy/src/main/kotlin/co.nilin.opex.port.bcgateway.chainproxy/impl/ChainEndpointProxyImpl.kt
index bf9e4e217..c214084dd 100644
--- a/BlockchainGateway/bc-gateway-ports/bc-chain-proxy/src/main/kotlin/co.nilin.opex.port.bcgateway.chainproxy/impl/ChainEndpointProxyImpl.kt
+++ b/BlockchainGateway/bc-gateway-ports/bc-chain-proxy/src/main/kotlin/co.nilin.opex.port.bcgateway.chainproxy/impl/ChainEndpointProxyImpl.kt
@@ -1,12 +1,80 @@
package co.nilin.opex.port.bcgateway.chainproxy.impl
import co.nilin.opex.bcgateway.core.model.ChainSyncRecord
+import co.nilin.opex.bcgateway.core.model.Deposit
import co.nilin.opex.bcgateway.core.model.Endpoint
import co.nilin.opex.bcgateway.core.spi.ChainEndpointProxy
-import org.springframework.stereotype.Component
+import kotlinx.coroutines.reactive.awaitFirstOrElse
+import org.springframework.core.ParameterizedTypeReference
+import org.springframework.web.reactive.function.client.WebClient
+import org.springframework.web.reactive.function.client.WebClientResponseException
+import reactor.core.publisher.Mono
+import java.math.BigDecimal
+import java.net.URI
+import java.time.LocalDateTime
+
+inline fun typeRef(): ParameterizedTypeReference = object : ParameterizedTypeReference() {}
+
+class ChainEndpointProxyImpl(
+ private val chain: String,
+ private val endpoints: List,
+ private val webClient: WebClient
+) :
+ ChainEndpointProxy {
+ data class TransfersRequest(
+ val startBlock: Long?,
+ val endBlock: Long?,
+ val addresses: List?
+ )
+
+ data class Transfer(
+ var txHash: String,
+ var from: String,
+ var to: String,
+ var isTokenTransfer: Boolean,
+ var token: String? = null,
+ var amount: BigDecimal
+ )
+
+ private suspend fun requestTransferList(baseUrl: String, request: TransfersRequest): List {
+ return webClient.post()
+ .uri(URI.create("$baseUrl/transfers"))
+ .header("Content-Type", "application/json")
+ .body(Mono.just(request), TransfersRequest::class.java)
+ .retrieve()
+ .onStatus({ t -> t.isError }, { it.createException() })
+ .bodyToFlux(typeRef())
+ .log().map { Deposit(null, it.to, null, it.amount, chain, it.isTokenTransfer, it.token) }
+ .collectList()
+ .awaitFirstOrElse { emptyList() }
+ }
+
+ private suspend fun roundRobin(i: Int, request: TransfersRequest): ChainSyncRecord {
+ return try {
+ val deposits =
+ requestTransferList(
+ endpoints[i].url,
+ request
+ )
+ ChainSyncRecord(chain, LocalDateTime.now(), endpoints[i], request.endBlock, true, null, deposits)
+ } catch (error: WebClientResponseException) {
+ if (i < endpoints.size - 1) {
+ roundRobin(i + 1, request)
+ } else {
+ ChainSyncRecord(
+ chain,
+ LocalDateTime.now(),
+ endpoints[i],
+ request.endBlock,
+ false,
+ error.message,
+ emptyList()
+ )
+ }
+ }
+ }
-class ChainEndpointProxyImpl(private val endpoints: List) : ChainEndpointProxy {
override suspend fun syncTransfers(filter: ChainEndpointProxy.DepositFilter): ChainSyncRecord {
- TODO("Not yet implemented")
+ return roundRobin(0, TransfersRequest(filter.startBlock, filter.endBlock, filter.tokenAddresses))
}
}
diff --git a/BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/impl/ChainEndpointProxyFinderImpl.kt b/BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/impl/ChainEndpointProxyFinderImpl.kt
index b0873c9f9..2e15c726a 100644
--- a/BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/impl/ChainEndpointProxyFinderImpl.kt
+++ b/BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/impl/ChainEndpointProxyFinderImpl.kt
@@ -8,11 +8,13 @@ import co.nilin.opex.port.bcgateway.postgres.dao.ChainRepository
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.flow.toList
import org.springframework.stereotype.Component
+import org.springframework.web.reactive.function.client.WebClient
@Component
-class ChainEndpointProxyFinderImpl(private val chainRepository: ChainRepository) : ChainEndpointProxyFinder {
+class ChainEndpointProxyFinderImpl(private val chainRepository: ChainRepository, private val webClient: WebClient) :
+ ChainEndpointProxyFinder {
override suspend fun findChainEndpointProxy(chainName: String): ChainEndpointProxy {
val endpoints = chainRepository.findEndpointsByName(chainName).map { Endpoint(it.url) }.toList()
- return ChainEndpointProxyImpl(endpoints)
+ return ChainEndpointProxyImpl(chainName, endpoints,webClient)
}
}