Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions BlockchainGateway/bc-gateway-ports/bc-chain-proxy/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,10 @@
<artifactId>bc-gateway-core</artifactId>
<version>${bc-gateway.version}</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-webflux</artifactId>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,80 @@
package co.nilin.opex.port.bcgateway.chainproxy.impl

import co.nilin.opex.bcgateway.core.model.ChainSyncRecord
import co.nilin.opex.bcgateway.core.model.Deposit
import co.nilin.opex.bcgateway.core.model.Endpoint
import co.nilin.opex.bcgateway.core.spi.ChainEndpointProxy
import org.springframework.stereotype.Component
import kotlinx.coroutines.reactive.awaitFirstOrElse
import org.springframework.core.ParameterizedTypeReference
import org.springframework.web.reactive.function.client.WebClient
import org.springframework.web.reactive.function.client.WebClientResponseException
import reactor.core.publisher.Mono
import java.math.BigDecimal
import java.net.URI
import java.time.LocalDateTime

inline fun <reified T : Any> typeRef(): ParameterizedTypeReference<T> = object : ParameterizedTypeReference<T>() {}

class ChainEndpointProxyImpl(
private val chain: String,
private val endpoints: List<Endpoint>,
private val webClient: WebClient
) :
ChainEndpointProxy {
data class TransfersRequest(
val startBlock: Long?,
val endBlock: Long?,
val addresses: List<String>?
)

data class Transfer(
var txHash: String,
var from: String,
var to: String,
var isTokenTransfer: Boolean,
var token: String? = null,
var amount: BigDecimal
)

private suspend fun requestTransferList(baseUrl: String, request: TransfersRequest): List<Deposit> {
return webClient.post()
.uri(URI.create("$baseUrl/transfers"))
.header("Content-Type", "application/json")
.body(Mono.just(request), TransfersRequest::class.java)
.retrieve()
.onStatus({ t -> t.isError }, { it.createException() })
.bodyToFlux(typeRef<Transfer>())
.log().map { Deposit(null, it.to, null, it.amount, chain, it.isTokenTransfer, it.token) }
.collectList()
.awaitFirstOrElse { emptyList() }
}

private suspend fun roundRobin(i: Int, request: TransfersRequest): ChainSyncRecord {
return try {
val deposits =
requestTransferList(
endpoints[i].url,
request
)
ChainSyncRecord(chain, LocalDateTime.now(), endpoints[i], request.endBlock, true, null, deposits)
} catch (error: WebClientResponseException) {
if (i < endpoints.size - 1) {
roundRobin(i + 1, request)
} else {
ChainSyncRecord(
chain,
LocalDateTime.now(),
endpoints[i],
request.endBlock,
false,
error.message,
emptyList()
)
}
}
}

class ChainEndpointProxyImpl(private val endpoints: List<Endpoint>) : ChainEndpointProxy {
override suspend fun syncTransfers(filter: ChainEndpointProxy.DepositFilter): ChainSyncRecord {
TODO("Not yet implemented")
return roundRobin(0, TransfersRequest(filter.startBlock, filter.endBlock, filter.tokenAddresses))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,13 @@ import co.nilin.opex.port.bcgateway.postgres.dao.ChainRepository
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.flow.toList
import org.springframework.stereotype.Component
import org.springframework.web.reactive.function.client.WebClient

@Component
class ChainEndpointProxyFinderImpl(private val chainRepository: ChainRepository) : ChainEndpointProxyFinder {
class ChainEndpointProxyFinderImpl(private val chainRepository: ChainRepository, private val webClient: WebClient) :
ChainEndpointProxyFinder {
override suspend fun findChainEndpointProxy(chainName: String): ChainEndpointProxy {
val endpoints = chainRepository.findEndpointsByName(chainName).map { Endpoint(it.url) }.toList()
return ChainEndpointProxyImpl(endpoints)
return ChainEndpointProxyImpl(chainName, endpoints,webClient)
}
}