From 25cb3a08f8146ef265631554e139b2e7d851ea41 Mon Sep 17 00:00:00 2001 From: Peyman Date: Wed, 3 Nov 2021 17:22:29 +0330 Subject: [PATCH] Fixed scan endpoint and Added demo data --- .../opex/bcgateway/core/model/WalletSync.kt | 11 +++- .../core/service/WalletSyncServiceImpl.kt | 31 ++++++---- .../core/spi/WalletSyncRecordHandler.kt | 9 ++- .../impl/ChainEndpointProxyImpl.kt | 10 ++-- .../postgres/config/DemoPostgresConfig.kt | 59 +++++++++++++------ .../postgres/dao/DepositRepository.kt | 10 +++- .../impl/ChainSyncRecordHandlerImpl.kt | 3 +- .../impl/WalletSyncRecordHandlerImpl.kt | 13 +++- 8 files changed, 102 insertions(+), 44 deletions(-) diff --git a/BlockchainGateway/bc-gateway-core/src/main/kotlin/co/nilin/opex/bcgateway/core/model/WalletSync.kt b/BlockchainGateway/bc-gateway-core/src/main/kotlin/co/nilin/opex/bcgateway/core/model/WalletSync.kt index 439e5e053..8dbca17ec 100644 --- a/BlockchainGateway/bc-gateway-core/src/main/kotlin/co/nilin/opex/bcgateway/core/model/WalletSync.kt +++ b/BlockchainGateway/bc-gateway-core/src/main/kotlin/co/nilin/opex/bcgateway/core/model/WalletSync.kt @@ -2,7 +2,14 @@ package co.nilin.opex.bcgateway.core.model import java.time.LocalDateTime -data class WalletSyncSchedule(val retryTime: LocalDateTime, val delay: Long, val batchSize: Long?) +data class WalletSyncSchedule( + val retryTime: LocalDateTime, + val delay: Long, + val batchSize: Long? +) + data class WalletSyncRecord( - val time: LocalDateTime, val success: Boolean, val error: String?, val deposit: Deposit + val time: LocalDateTime, + val success: Boolean, + val error: String? ) diff --git a/BlockchainGateway/bc-gateway-core/src/main/kotlin/co/nilin/opex/bcgateway/core/service/WalletSyncServiceImpl.kt b/BlockchainGateway/bc-gateway-core/src/main/kotlin/co/nilin/opex/bcgateway/core/service/WalletSyncServiceImpl.kt index 111f6906a..575d0c992 100644 --- a/BlockchainGateway/bc-gateway-core/src/main/kotlin/co/nilin/opex/bcgateway/core/service/WalletSyncServiceImpl.kt +++ b/BlockchainGateway/bc-gateway-core/src/main/kotlin/co/nilin/opex/bcgateway/core/service/WalletSyncServiceImpl.kt @@ -5,9 +5,7 @@ import co.nilin.opex.bcgateway.core.model.CurrencyImplementation import co.nilin.opex.bcgateway.core.model.Deposit import co.nilin.opex.bcgateway.core.model.WalletSyncRecord import co.nilin.opex.bcgateway.core.spi.* -import kotlinx.coroutines.ExecutorCoroutineDispatcher -import kotlinx.coroutines.async -import kotlinx.coroutines.withContext +import kotlinx.coroutines.* import org.slf4j.LoggerFactory import java.math.BigDecimal import java.time.LocalDateTime @@ -31,26 +29,33 @@ class WalletSyncServiceImpl( if (schedule != null) { val deposits = walletSyncRecordHandler.findReadyToSyncTransfers(schedule.batchSize) logger.info("syncing ${deposits.size} deposits") - deposits.map { deposit -> + + val result = deposits.map { deposit -> async(dispatcher) { + var deposited = false val uuid = assignedAddressHandler.findUuid(deposit.depositor, deposit.depositorMemo) if (uuid != null) { logger.info("deposit came for $uuid - to ${deposit.depositor}") val symbol = currencyLoader.findByChainAndTokenAddress(deposit.chain, deposit.tokenAddress) if (symbol != null) { sendDeposit(uuid, symbol, deposit) + deposited = true } } - walletSyncRecordHandler.saveWalletSyncRecord( - WalletSyncRecord( - LocalDateTime.now(), - true, - null, - deposit - ) - ) + Pair(deposit, deposited) } - } + }.awaitAll() + + walletSyncRecordHandler.saveWalletSyncRecord( + WalletSyncRecord( + LocalDateTime.now(), + true, + null + ), + result.filter { it.second }.map { it.first }, + result.filter { !it.second }.map { it.first } + ) + syncSchedulerHandler.prepareScheduleForNextTry( schedule, LocalDateTime.now() .plus(schedule.delay, ChronoUnit.SECONDS) diff --git a/BlockchainGateway/bc-gateway-core/src/main/kotlin/co/nilin/opex/bcgateway/core/spi/WalletSyncRecordHandler.kt b/BlockchainGateway/bc-gateway-core/src/main/kotlin/co/nilin/opex/bcgateway/core/spi/WalletSyncRecordHandler.kt index 90979ab28..1d8e02380 100644 --- a/BlockchainGateway/bc-gateway-core/src/main/kotlin/co/nilin/opex/bcgateway/core/spi/WalletSyncRecordHandler.kt +++ b/BlockchainGateway/bc-gateway-core/src/main/kotlin/co/nilin/opex/bcgateway/core/spi/WalletSyncRecordHandler.kt @@ -4,7 +4,14 @@ import co.nilin.opex.bcgateway.core.model.Deposit import co.nilin.opex.bcgateway.core.model.WalletSyncRecord interface WalletSyncRecordHandler { + suspend fun saveReadyToSyncTransfers(chainName: String, deposits: List) - suspend fun saveWalletSyncRecord(syncRecord: WalletSyncRecord) + + suspend fun saveWalletSyncRecord( + syncRecord: WalletSyncRecord, + sentDeposits: List, + deletingDeposits: List + ) + suspend fun findReadyToSyncTransfers(count: Long?): List } diff --git a/BlockchainGateway/bc-gateway-ports/bc-chain-proxy/src/main/kotlin/co.nilin.opex.port.bcgateway.chainproxy/impl/ChainEndpointProxyImpl.kt b/BlockchainGateway/bc-gateway-ports/bc-chain-proxy/src/main/kotlin/co.nilin.opex.port.bcgateway.chainproxy/impl/ChainEndpointProxyImpl.kt index 4e0f47be9..8676a3240 100644 --- a/BlockchainGateway/bc-gateway-ports/bc-chain-proxy/src/main/kotlin/co.nilin.opex.port.bcgateway.chainproxy/impl/ChainEndpointProxyImpl.kt +++ b/BlockchainGateway/bc-gateway-ports/bc-chain-proxy/src/main/kotlin/co.nilin.opex.port.bcgateway.chainproxy/impl/ChainEndpointProxyImpl.kt @@ -23,8 +23,8 @@ class ChainEndpointProxyImpl( private val chain: String, private val endpoints: List, private val webClient: WebClient -) : - ChainEndpointProxy { +) : ChainEndpointProxy { + data class TransfersRequest( val startBlock: Long?, val endBlock: Long?, @@ -47,10 +47,10 @@ class ChainEndpointProxyImpl( private val logger = LoggerFactory.getLogger(ChainEndpointProxyImpl::class.java) - private suspend fun requestTransferList(baseUrl: String, request: TransfersRequest): DepositResult { - logger.info("request transfers: base=$baseUrl") + private suspend fun requestTransferList(endpoint: String, request: TransfersRequest): DepositResult { + logger.info("request transfers: base=$endpoint") val response = webClient.post() - .uri(URI.create("$baseUrl/transfers")) + .uri(URI.create(endpoint)) .header("Content-Type", "application/json") .body(Mono.just(request)) .retrieve() diff --git a/BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/config/DemoPostgresConfig.kt b/BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/config/DemoPostgresConfig.kt index 7571e4043..84eff19dc 100644 --- a/BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/config/DemoPostgresConfig.kt +++ b/BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/config/DemoPostgresConfig.kt @@ -11,25 +11,46 @@ class DemoPostgresConfig(db: DatabaseClient) { init { val initDb = db.sql { """ - insert into currency values ('BTC', 'Bitcoin') ON CONFLICT DO NOTHING; - insert into chains values ('Bit') ON CONFLICT DO NOTHING; - insert into chains values ('Bsc') ON CONFLICT DO NOTHING; - insert into address_types(id, address_type, address_regex) values (1, 'BTC', '.*') ON CONFLICT DO NOTHING; - insert into address_types(id, address_type, address_regex) values (2, 'ETH', '.*') ON CONFLICT DO NOTHING; - insert into chain_address_types (chain_name, addr_type_id) values ('Bit', 1) ON CONFLICT DO NOTHING; - insert into chain_address_types (chain_name, addr_type_id) values ('Bsc', 2) ON CONFLICT DO NOTHING; - insert into currency_implementations ( - id, - symbol, - chain, - token, - token_address, - token_name, - withdraw_enabled, - withdraw_fee, - withdraw_min - )values(1, 'BTC', 'Bit', false, null, null, true, 0.0001, 0.0001) - ,(2, 'BTC', 'Bsc', true, '0x1111', 'WBTC', true, 0.00001, 0.000001) ON CONFLICT DO NOTHING; + insert into address_types values + (1, 'bitcoin', '', ''), + (2, 'ethereum', '', '') + ON CONFLICT DO NOTHING; + + insert into chains values + ('bitcoin-testnet'), + ('ethereum-ropsten'), + ('bsc-ropsten') + ON CONFLICT DO NOTHING; + + insert into chain_address_types values + (1, 'bitcoin-testnet', 1), + (2, 'ethereum-ropsten', 2), + (3, 'bsc-ropsten', 2) + ON CONFLICT DO NOTHING; + + insert into currency values + ('BTC', 'Bitcoin'), + ('ETH', 'Ethereum'), + ('USDT', 'Tether') + ON CONFLICT DO NOTHING; + + insert into currency_implementations values + (1, 'BTC', 'bitcoin-testnet', false, null, null, true, 0.00001, 0.00001, 0), + (2, 'ETH', 'ethereum-ropsten', false, null, null, true, 0.0001, 0.0001, 18), + (3, 'USDT', 'ethereum-ropsten', true, '0x110a13fc3efe6a245b50102d2d79b3e76125ae83', 'USDT', true, 0.01, 0.01, 6) + ON CONFLICT DO NOTHING; + + insert into chain_endpoints (id, chain_name, endpoint_url) values + (1, 'bitcoin-testnet', 'http://host.docker.internal:9990/bitcoin/transfers'), + (2, 'ethereum-ropsten', 'http://host.docker.internal:9990/eth/transfers') + ON CONFLICT DO NOTHING; + + insert into chain_sync_schedules values + ('bitcoin-testnet', CURRENT_DATE, 600), + ('ethereum-ropsten', CURRENT_DATE, 90) + ON CONFLICT DO NOTHING; + + insert into wallet_sync_schedules values (1, CURRENT_DATE, 30, 10000) ON CONFLICT DO NOTHING; """ } initDb // initialize the database diff --git a/BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/dao/DepositRepository.kt b/BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/dao/DepositRepository.kt index 988ed7181..29f84aad3 100644 --- a/BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/dao/DepositRepository.kt +++ b/BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/dao/DepositRepository.kt @@ -16,7 +16,7 @@ interface DepositRepository : ReactiveCrudRepository { fun findByChain(chain: String): Flow @Query("select * from deposits where hash in (:hash)") - fun findAllByHash(hash: List):Flow + fun findAllByHash(hash: List): Flow @Query("select * from deposits where chain = :chain and wallet_record_id is null") fun findByChainWhereNotSynced(chain: String): Flow @@ -28,6 +28,10 @@ interface DepositRepository : ReactiveCrudRepository { @Query("update deposits set wallet_record_id = :walletRecordId where id = :id") fun updateWalletSyncRecord(id: Long, walletRecordId: Long): Mono + @Modifying + @Query("update deposits set wallet_record_id = :walletRecordId where id in (:ids)") + fun updateWalletSyncRecords(ids: List, walletRecordId: Long): Mono + @Modifying @Query( """ @@ -54,4 +58,8 @@ interface DepositRepository : ReactiveCrudRepository { memo: String? ): Mono + @Modifying + @Query("delete from deposits where id in (:ids)") + fun deleteSyncedDeposits(ids: List): Mono + } diff --git a/BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/impl/ChainSyncRecordHandlerImpl.kt b/BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/impl/ChainSyncRecordHandlerImpl.kt index b0bd7a279..3aa811004 100644 --- a/BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/impl/ChainSyncRecordHandlerImpl.kt +++ b/BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/impl/ChainSyncRecordHandlerImpl.kt @@ -41,12 +41,13 @@ class ChainSyncRecordHandlerImpl( @Transactional override suspend fun saveSyncRecord(syncRecord: ChainSyncRecord) { + val currentRecord = chainSyncRecordRepository.findByChain(syncRecord.chainName).awaitSingleOrNull() val chainSyncRecordDao = ChainSyncRecordModel( syncRecord.chainName, syncRecord.time, syncRecord.endpoint.url, - syncRecord.latestBlock, + syncRecord.latestBlock ?: currentRecord?.latestBlock, syncRecord.success, syncRecord.error ) diff --git a/BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/impl/WalletSyncRecordHandlerImpl.kt b/BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/impl/WalletSyncRecordHandlerImpl.kt index 37db74dc1..69d65d181 100644 --- a/BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/impl/WalletSyncRecordHandlerImpl.kt +++ b/BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/impl/WalletSyncRecordHandlerImpl.kt @@ -36,7 +36,11 @@ class WalletSyncRecordHandlerImpl( } @Transactional - override suspend fun saveWalletSyncRecord(syncRecord: WalletSyncRecord) { + override suspend fun saveWalletSyncRecord( + syncRecord: WalletSyncRecord, + sentDeposits: List, + deletingDeposits: List + ) { val dao = walletSyncRecordRepository.save( WalletSyncRecordModel( null, @@ -45,7 +49,12 @@ class WalletSyncRecordHandlerImpl( syncRecord.error ) ).awaitFirst() - depositRepository.updateWalletSyncRecord(syncRecord.deposit.id!!, dao.id!!).awaitFirst() + + if (sentDeposits.isNotEmpty()) + depositRepository.updateWalletSyncRecords(sentDeposits.map { it.id ?: -1 }, dao.id!!).awaitFirst() + + if (deletingDeposits.isNotEmpty()) + depositRepository.deleteSyncedDeposits(deletingDeposits.map { it.id ?: -1 }).awaitFirst() } override suspend fun findReadyToSyncTransfers(count: Long?): List {