From ff3f0a8b709aece8405b3c4bd9fe88e0ae344b6b Mon Sep 17 00:00:00 2001 From: metalicn20 Date: Sat, 11 Sep 2021 13:34:06 +0430 Subject: [PATCH 01/24] Add SyncRecordHandler, WalletSyncRecordHandler raw implementations --- .../postgres/impl/SyncRecordHandlerImpl.kt | 19 +++++++++++++++++++ .../impl/WalletSyncRecordHandlerImpl.kt | 19 +++++++++++++++++++ 2 files changed, 38 insertions(+) create mode 100644 BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/impl/SyncRecordHandlerImpl.kt create mode 100644 BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/impl/WalletSyncRecordHandlerImpl.kt diff --git a/BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/impl/SyncRecordHandlerImpl.kt b/BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/impl/SyncRecordHandlerImpl.kt new file mode 100644 index 000000000..912f10e63 --- /dev/null +++ b/BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/impl/SyncRecordHandlerImpl.kt @@ -0,0 +1,19 @@ +package co.nilin.opex.port.bcgateway.postgres.impl + +import co.nilin.opex.bcgateway.core.model.ChainSyncRecord +import co.nilin.opex.bcgateway.core.model.ChainSyncSchedule +import co.nilin.opex.bcgateway.core.spi.SyncRecordHandler +import co.nilin.opex.bcgateway.core.spi.SyncSchedulerHandler +import org.springframework.stereotype.Component +import java.time.LocalDateTime + +@Component +class SyncRecordHandlerImpl : SyncRecordHandler { + override suspend fun loadLastSuccessRecord(chainName: String): ChainSyncRecord? { + TODO("Not yet implemented") + } + + override suspend fun saveSyncRecord(syncRecord: ChainSyncRecord) { + TODO("Not yet implemented") + } +} \ No newline at end of file diff --git a/BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/impl/WalletSyncRecordHandlerImpl.kt b/BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/impl/WalletSyncRecordHandlerImpl.kt new file mode 100644 index 000000000..1d10ce26f --- /dev/null +++ b/BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/impl/WalletSyncRecordHandlerImpl.kt @@ -0,0 +1,19 @@ +package co.nilin.opex.port.bcgateway.postgres.impl + +import co.nilin.opex.bcgateway.core.model.Deposit +import co.nilin.opex.bcgateway.core.model.WalletSyncSchedule +import co.nilin.opex.bcgateway.core.spi.WalletSyncRecordHandler +import co.nilin.opex.bcgateway.core.spi.WalletSyncSchedulerHandler +import org.springframework.stereotype.Component +import java.time.LocalDateTime + +@Component +class WalletSyncRecordHandlerImpl : WalletSyncRecordHandler { + override suspend fun saveReadyToSyncTransfers(chainName: String, deposits: List) { + TODO("Not yet implemented") + } + + override suspend fun findReadyToSyncTransfers(count: Long?): List { + TODO("Not yet implemented") + } +} \ No newline at end of file From 4d5e31e7b57f5f897627969d9a3762a55d0baed4 Mon Sep 17 00:00:00 2001 From: metalicn20 Date: Sun, 12 Sep 2021 11:43:28 +0430 Subject: [PATCH 02/24] Fix some optional columns in postgres module --- .../opex/port/bcgateway/postgres/config/PostgresConfig.kt | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/config/PostgresConfig.kt b/BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/config/PostgresConfig.kt index 0804207c9..ae042a6cd 100644 --- a/BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/config/PostgresConfig.kt +++ b/BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/config/PostgresConfig.kt @@ -54,14 +54,14 @@ class PostgresConfig(db: DatabaseClient) { ); CREATE TABLE IF NOT EXISTS chain_sync_schedules ( chain VARCHAR(72) PRIMARY KEY, - retry_time TIMESTAMP, - delay NUMERIC + retry_time TIMESTAMP NOT NULL, + delay NUMERIC NOT NULL ); CREATE TABLE IF NOT EXISTS chain_sync_records ( chain VARCHAR(72) PRIMARY KEY, time TIMESTAMP NOT NULL, endpoint_url VARCHAR(72) NOT NULL, - latest_block INTEGER NOT NULL, + latest_block INTEGER, success BOOLEAN NOT NULL, error VARCHAR(100) ); From ea4be346a7ab676e7abf7613944d227887762700 Mon Sep 17 00:00:00 2001 From: metalicn20 Date: Sun, 12 Sep 2021 11:57:32 +0430 Subject: [PATCH 03/24] Implement SyncRecordHandlerImpl.loadLastSuccessRecord() without records list --- .../opex/bcgateway/core/model/WalletSync.kt | 5 +--- .../postgres/dao/ChainSyncRecordRepository.kt | 12 ++++------ .../postgres/impl/SyncRecordHandlerImpl.kt | 23 +++++++++++++++---- .../bcgateway/postgres/model/SyncModel.kt | 2 +- 4 files changed, 25 insertions(+), 17 deletions(-) diff --git a/BlockchainGateway/bc-gateway-core/src/main/kotlin/co/nilin/opex/bcgateway/core/model/WalletSync.kt b/BlockchainGateway/bc-gateway-core/src/main/kotlin/co/nilin/opex/bcgateway/core/model/WalletSync.kt index 3f4ec00fa..81fc4115d 100644 --- a/BlockchainGateway/bc-gateway-core/src/main/kotlin/co/nilin/opex/bcgateway/core/model/WalletSync.kt +++ b/BlockchainGateway/bc-gateway-core/src/main/kotlin/co/nilin/opex/bcgateway/core/model/WalletSync.kt @@ -4,8 +4,5 @@ import java.time.LocalDateTime data class WalletSyncSchedule(val retryTime: LocalDateTime, val delay: Long, val batchSize: Long?) data class WalletSyncRecord( - val time: LocalDateTime - , val success: Boolean - , val error: String? - , val deposit: Deposit + val time: LocalDateTime, val success: Boolean, val error: String?, val deposit: Deposit ) \ No newline at end of file diff --git a/BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/dao/ChainSyncRecordRepository.kt b/BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/dao/ChainSyncRecordRepository.kt index b3f4ee2b0..b0fa51f46 100644 --- a/BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/dao/ChainSyncRecordRepository.kt +++ b/BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/dao/ChainSyncRecordRepository.kt @@ -1,13 +1,11 @@ package co.nilin.opex.port.bcgateway.postgres.dao -import co.nilin.opex.port.bcgateway.postgres.model.AssignedAddressChainModel -import co.nilin.opex.port.bcgateway.postgres.model.ChainModel -import co.nilin.opex.port.bcgateway.postgres.model.SyncRecord -import kotlinx.coroutines.flow.Flow -import org.springframework.data.r2dbc.repository.Query -import org.springframework.data.repository.query.Param +import co.nilin.opex.port.bcgateway.postgres.model.SyncRecordModel import org.springframework.data.repository.reactive.ReactiveCrudRepository import org.springframework.stereotype.Repository +import reactor.core.publisher.Mono @Repository -interface ChainSyncRecordRepository : ReactiveCrudRepository \ No newline at end of file +interface ChainSyncRecordRepository : ReactiveCrudRepository { + fun findByChain(chain: String): Mono +} \ No newline at end of file diff --git a/BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/impl/SyncRecordHandlerImpl.kt b/BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/impl/SyncRecordHandlerImpl.kt index 912f10e63..4594b48a3 100644 --- a/BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/impl/SyncRecordHandlerImpl.kt +++ b/BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/impl/SyncRecordHandlerImpl.kt @@ -1,16 +1,29 @@ package co.nilin.opex.port.bcgateway.postgres.impl import co.nilin.opex.bcgateway.core.model.ChainSyncRecord -import co.nilin.opex.bcgateway.core.model.ChainSyncSchedule +import co.nilin.opex.bcgateway.core.model.Endpoint import co.nilin.opex.bcgateway.core.spi.SyncRecordHandler -import co.nilin.opex.bcgateway.core.spi.SyncSchedulerHandler +import co.nilin.opex.port.bcgateway.postgres.dao.ChainSyncRecordRepository +import kotlinx.coroutines.reactive.awaitSingleOrNull import org.springframework.stereotype.Component -import java.time.LocalDateTime @Component -class SyncRecordHandlerImpl : SyncRecordHandler { +class SyncRecordHandlerImpl(private val chainSyncRecordRepository: ChainSyncRecordRepository) : SyncRecordHandler { override suspend fun loadLastSuccessRecord(chainName: String): ChainSyncRecord? { - TODO("Not yet implemented") + val dao = chainSyncRecordRepository.findByChain(chainName).awaitSingleOrNull() + return if (dao !== null) { + ChainSyncRecord( + dao.chain, + dao.time, + Endpoint(dao.endpointUrl), + dao.latestBlock, + dao.success, + dao.error, + emptyList() + ) + } else { + null + } } override suspend fun saveSyncRecord(syncRecord: ChainSyncRecord) { diff --git a/BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/model/SyncModel.kt b/BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/model/SyncModel.kt index a131264ce..161b6af57 100644 --- a/BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/model/SyncModel.kt +++ b/BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/model/SyncModel.kt @@ -11,7 +11,7 @@ data class SyncScheduleModel( ) @Table("chain_sync_record") -data class SyncRecord( +data class SyncRecordModel( @Id @Column("chain") val chain: String, val time: LocalDateTime, @Column("endpoint_url") val endpointUrl: String, From 8ae1fbaf319e7c62366ef57e2f699cca3a995a18 Mon Sep 17 00:00:00 2001 From: metalicn20 Date: Sun, 12 Sep 2021 12:36:48 +0430 Subject: [PATCH 04/24] Add deposits table and repository --- .../bcgateway/postgres/config/PostgresConfig.kt | 8 ++++++++ .../bcgateway/postgres/dao/DepositRepository.kt | 12 ++++++++++++ .../bcgateway/postgres/model/DepositModel.kt | 16 ++++++++++++++++ 3 files changed, 36 insertions(+) create mode 100644 BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/dao/DepositRepository.kt create mode 100644 BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/model/DepositModel.kt diff --git a/BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/config/PostgresConfig.kt b/BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/config/PostgresConfig.kt index ae042a6cd..6388680d7 100644 --- a/BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/config/PostgresConfig.kt +++ b/BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/config/PostgresConfig.kt @@ -79,6 +79,14 @@ class PostgresConfig(db: DatabaseClient) { withdraw_fee NUMERIC NOT NULL, withdraw_min NUMERIC NOT NULL ); + CREATE TABLE IF NOT EXISTS deposits ( + id SERIAL PRIMARY KEY, + chain VARCHAR(72), + token_address VARCHAR(72), + amount NUMERIC NOT NULL, + depositor VARCHAR(72) NOT NULL, + depositorMemo VARCHAR(72) + ); """ } initDb // initialize the database diff --git a/BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/dao/DepositRepository.kt b/BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/dao/DepositRepository.kt new file mode 100644 index 000000000..2f7bbb9df --- /dev/null +++ b/BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/dao/DepositRepository.kt @@ -0,0 +1,12 @@ +package co.nilin.opex.port.bcgateway.postgres.dao + +import co.nilin.opex.port.bcgateway.postgres.model.AddressTypeModel +import co.nilin.opex.port.bcgateway.postgres.model.DepositModel +import kotlinx.coroutines.flow.Flow +import org.springframework.data.repository.reactive.ReactiveCrudRepository +import org.springframework.stereotype.Repository + +@Repository +interface DepositRepository : ReactiveCrudRepository { + fun findByChain(chain: String): Flow +} \ No newline at end of file diff --git a/BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/model/DepositModel.kt b/BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/model/DepositModel.kt new file mode 100644 index 000000000..9b76488df --- /dev/null +++ b/BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/model/DepositModel.kt @@ -0,0 +1,16 @@ +package co.nilin.opex.port.bcgateway.postgres.model + +import org.springframework.data.annotation.Id +import org.springframework.data.relational.core.mapping.Column +import org.springframework.data.relational.core.mapping.Table +import java.math.BigDecimal + +@Table("deposits") +data class DepositModel( + @Id val id: Long?, + val depositor: String, + val depositorMemo: String?, + val amount: BigDecimal, + val chain: String?, + val tokenAddress: String? +) \ No newline at end of file From d7b60b4e6dc611d1e1b8ba58bb3df51738145b91 Mon Sep 17 00:00:00 2001 From: metalicn20 Date: Sun, 12 Sep 2021 12:41:20 +0430 Subject: [PATCH 05/24] Add records to sync record handler --- .../postgres/dao/DepositRepository.kt | 1 - .../postgres/impl/SyncRecordHandlerImpl.kt | 30 ++++++++++++------- 2 files changed, 20 insertions(+), 11 deletions(-) diff --git a/BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/dao/DepositRepository.kt b/BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/dao/DepositRepository.kt index 2f7bbb9df..f5e21069b 100644 --- a/BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/dao/DepositRepository.kt +++ b/BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/dao/DepositRepository.kt @@ -1,6 +1,5 @@ package co.nilin.opex.port.bcgateway.postgres.dao -import co.nilin.opex.port.bcgateway.postgres.model.AddressTypeModel import co.nilin.opex.port.bcgateway.postgres.model.DepositModel import kotlinx.coroutines.flow.Flow import org.springframework.data.repository.reactive.ReactiveCrudRepository diff --git a/BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/impl/SyncRecordHandlerImpl.kt b/BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/impl/SyncRecordHandlerImpl.kt index 4594b48a3..e98ee312d 100644 --- a/BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/impl/SyncRecordHandlerImpl.kt +++ b/BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/impl/SyncRecordHandlerImpl.kt @@ -1,25 +1,35 @@ package co.nilin.opex.port.bcgateway.postgres.impl import co.nilin.opex.bcgateway.core.model.ChainSyncRecord +import co.nilin.opex.bcgateway.core.model.Deposit import co.nilin.opex.bcgateway.core.model.Endpoint import co.nilin.opex.bcgateway.core.spi.SyncRecordHandler import co.nilin.opex.port.bcgateway.postgres.dao.ChainSyncRecordRepository +import co.nilin.opex.port.bcgateway.postgres.dao.DepositRepository +import kotlinx.coroutines.flow.map +import kotlinx.coroutines.flow.toList import kotlinx.coroutines.reactive.awaitSingleOrNull import org.springframework.stereotype.Component @Component -class SyncRecordHandlerImpl(private val chainSyncRecordRepository: ChainSyncRecordRepository) : SyncRecordHandler { +class SyncRecordHandlerImpl( + private val chainSyncRecordRepository: ChainSyncRecordRepository, + private val depositRepository: DepositRepository +) : SyncRecordHandler { override suspend fun loadLastSuccessRecord(chainName: String): ChainSyncRecord? { - val dao = chainSyncRecordRepository.findByChain(chainName).awaitSingleOrNull() - return if (dao !== null) { + val chainSyncRecordDao = chainSyncRecordRepository.findByChain(chainName).awaitSingleOrNull() + return if (chainSyncRecordDao !== null) { + val deposits = depositRepository.findByChain(chainName).map { + Deposit(it.depositor, it.depositorMemo, it.amount, it.chain, it.tokenAddress) + } ChainSyncRecord( - dao.chain, - dao.time, - Endpoint(dao.endpointUrl), - dao.latestBlock, - dao.success, - dao.error, - emptyList() + chainSyncRecordDao.chain, + chainSyncRecordDao.time, + Endpoint(chainSyncRecordDao.endpointUrl), + chainSyncRecordDao.latestBlock, + chainSyncRecordDao.success, + chainSyncRecordDao.error, + deposits.toList() ) } else { null From 26e30b68825ffdebe858eb95be6ea2264918b094 Mon Sep 17 00:00:00 2001 From: metalicn20 Date: Sun, 12 Sep 2021 13:20:10 +0430 Subject: [PATCH 06/24] Add token flag to deposit --- .../kotlin/co/nilin/opex/bcgateway/core/model/Deposit.kt | 9 ++++++++- .../port/bcgateway/postgres/config/PostgresConfig.kt | 1 + .../opex/port/bcgateway/postgres/model/DepositModel.kt | 1 + 3 files changed, 10 insertions(+), 1 deletion(-) diff --git a/BlockchainGateway/bc-gateway-core/src/main/kotlin/co/nilin/opex/bcgateway/core/model/Deposit.kt b/BlockchainGateway/bc-gateway-core/src/main/kotlin/co/nilin/opex/bcgateway/core/model/Deposit.kt index 579803a87..9cdaee869 100644 --- a/BlockchainGateway/bc-gateway-core/src/main/kotlin/co/nilin/opex/bcgateway/core/model/Deposit.kt +++ b/BlockchainGateway/bc-gateway-core/src/main/kotlin/co/nilin/opex/bcgateway/core/model/Deposit.kt @@ -2,4 +2,11 @@ package co.nilin.opex.bcgateway.core.model import java.math.BigDecimal -data class Deposit(val depositor: String, val depositorMemo: String?, val amount: BigDecimal, val chain: String?, val tokenAddress: String?) +data class Deposit( + val depositor: String, + val depositorMemo: String?, + val amount: BigDecimal, + val chain: String?, + val token: Boolean, + val tokenAddress: String? +) diff --git a/BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/config/PostgresConfig.kt b/BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/config/PostgresConfig.kt index 6388680d7..14c3890df 100644 --- a/BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/config/PostgresConfig.kt +++ b/BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/config/PostgresConfig.kt @@ -82,6 +82,7 @@ class PostgresConfig(db: DatabaseClient) { CREATE TABLE IF NOT EXISTS deposits ( id SERIAL PRIMARY KEY, chain VARCHAR(72), + token BOOLEAN NOT NULL, token_address VARCHAR(72), amount NUMERIC NOT NULL, depositor VARCHAR(72) NOT NULL, diff --git a/BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/model/DepositModel.kt b/BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/model/DepositModel.kt index 9b76488df..d79321a48 100644 --- a/BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/model/DepositModel.kt +++ b/BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/model/DepositModel.kt @@ -12,5 +12,6 @@ data class DepositModel( val depositorMemo: String?, val amount: BigDecimal, val chain: String?, + val token: Boolean, val tokenAddress: String? ) \ No newline at end of file From 865ddbf5f2f7b438e87649f7f41029f1b3c005a6 Mon Sep 17 00:00:00 2001 From: metalicn20 Date: Sun, 12 Sep 2021 13:21:46 +0430 Subject: [PATCH 07/24] Implement SyncRecordHandlerImpl.saveSyncRecord() --- .../postgres/impl/SyncRecordHandlerImpl.kt | 29 +++++++++++++++++-- 1 file changed, 27 insertions(+), 2 deletions(-) diff --git a/BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/impl/SyncRecordHandlerImpl.kt b/BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/impl/SyncRecordHandlerImpl.kt index e98ee312d..2a708fba3 100644 --- a/BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/impl/SyncRecordHandlerImpl.kt +++ b/BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/impl/SyncRecordHandlerImpl.kt @@ -6,10 +6,14 @@ import co.nilin.opex.bcgateway.core.model.Endpoint import co.nilin.opex.bcgateway.core.spi.SyncRecordHandler import co.nilin.opex.port.bcgateway.postgres.dao.ChainSyncRecordRepository import co.nilin.opex.port.bcgateway.postgres.dao.DepositRepository +import co.nilin.opex.port.bcgateway.postgres.model.DepositModel +import co.nilin.opex.port.bcgateway.postgres.model.SyncRecordModel import kotlinx.coroutines.flow.map import kotlinx.coroutines.flow.toList +import kotlinx.coroutines.reactive.awaitFirst import kotlinx.coroutines.reactive.awaitSingleOrNull import org.springframework.stereotype.Component +import reactor.core.publisher.Mono @Component class SyncRecordHandlerImpl( @@ -20,7 +24,7 @@ class SyncRecordHandlerImpl( val chainSyncRecordDao = chainSyncRecordRepository.findByChain(chainName).awaitSingleOrNull() return if (chainSyncRecordDao !== null) { val deposits = depositRepository.findByChain(chainName).map { - Deposit(it.depositor, it.depositorMemo, it.amount, it.chain, it.tokenAddress) + Deposit(it.depositor, it.depositorMemo, it.amount, it.chain, it.token, it.tokenAddress) } ChainSyncRecord( chainSyncRecordDao.chain, @@ -37,6 +41,27 @@ class SyncRecordHandlerImpl( } override suspend fun saveSyncRecord(syncRecord: ChainSyncRecord) { - TODO("Not yet implemented") + val chainSyncRecordDao = + SyncRecordModel( + syncRecord.chainName, + syncRecord.time, + syncRecord.endpoint.url, + syncRecord.latestBlock, + syncRecord.success, + syncRecord.error + ) + val deposits = syncRecord.records.map { + DepositModel( + null, + it.depositor, + it.depositorMemo, + it.amount, + it.chain, + it.token, + it.tokenAddress + ) + } + Mono.`when`(chainSyncRecordRepository.save(chainSyncRecordDao), depositRepository.saveAll(deposits)) + .awaitFirst() } } \ No newline at end of file From 0630720b3089c73cdfbc483d62413fabd79f4ae2 Mon Sep 17 00:00:00 2001 From: metalicn20 Date: Sun, 12 Sep 2021 14:00:01 +0430 Subject: [PATCH 08/24] Add transactional saving ability to sync records --- .../bcgateway/postgres/impl/SyncRecordHandlerImpl.kt | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/impl/SyncRecordHandlerImpl.kt b/BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/impl/SyncRecordHandlerImpl.kt index 2a708fba3..b456fe05b 100644 --- a/BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/impl/SyncRecordHandlerImpl.kt +++ b/BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/impl/SyncRecordHandlerImpl.kt @@ -8,12 +8,14 @@ import co.nilin.opex.port.bcgateway.postgres.dao.ChainSyncRecordRepository import co.nilin.opex.port.bcgateway.postgres.dao.DepositRepository import co.nilin.opex.port.bcgateway.postgres.model.DepositModel import co.nilin.opex.port.bcgateway.postgres.model.SyncRecordModel +import kotlinx.coroutines.async import kotlinx.coroutines.flow.map import kotlinx.coroutines.flow.toList import kotlinx.coroutines.reactive.awaitFirst import kotlinx.coroutines.reactive.awaitSingleOrNull +import kotlinx.coroutines.runBlocking import org.springframework.stereotype.Component -import reactor.core.publisher.Mono +import org.springframework.transaction.annotation.Transactional @Component class SyncRecordHandlerImpl( @@ -22,6 +24,7 @@ class SyncRecordHandlerImpl( ) : SyncRecordHandler { override suspend fun loadLastSuccessRecord(chainName: String): ChainSyncRecord? { val chainSyncRecordDao = chainSyncRecordRepository.findByChain(chainName).awaitSingleOrNull() + return if (chainSyncRecordDao !== null) { val deposits = depositRepository.findByChain(chainName).map { Deposit(it.depositor, it.depositorMemo, it.amount, it.chain, it.token, it.tokenAddress) @@ -40,6 +43,7 @@ class SyncRecordHandlerImpl( } } + @Transactional override suspend fun saveSyncRecord(syncRecord: ChainSyncRecord) { val chainSyncRecordDao = SyncRecordModel( @@ -50,6 +54,7 @@ class SyncRecordHandlerImpl( syncRecord.success, syncRecord.error ) + chainSyncRecordRepository.save(chainSyncRecordDao).awaitFirst() val deposits = syncRecord.records.map { DepositModel( null, @@ -61,7 +66,6 @@ class SyncRecordHandlerImpl( it.tokenAddress ) } - Mono.`when`(chainSyncRecordRepository.save(chainSyncRecordDao), depositRepository.saveAll(deposits)) - .awaitFirst() + depositRepository.saveAll(deposits).awaitFirst() } } \ No newline at end of file From 075b90830a239a8311263b33b8393ae57219f897 Mon Sep 17 00:00:00 2001 From: metalicn20 Date: Sun, 12 Sep 2021 14:01:04 +0430 Subject: [PATCH 09/24] Organize sync record imports --- .../opex/port/bcgateway/postgres/impl/SyncRecordHandlerImpl.kt | 2 -- 1 file changed, 2 deletions(-) diff --git a/BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/impl/SyncRecordHandlerImpl.kt b/BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/impl/SyncRecordHandlerImpl.kt index b456fe05b..dac616392 100644 --- a/BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/impl/SyncRecordHandlerImpl.kt +++ b/BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/impl/SyncRecordHandlerImpl.kt @@ -8,12 +8,10 @@ import co.nilin.opex.port.bcgateway.postgres.dao.ChainSyncRecordRepository import co.nilin.opex.port.bcgateway.postgres.dao.DepositRepository import co.nilin.opex.port.bcgateway.postgres.model.DepositModel import co.nilin.opex.port.bcgateway.postgres.model.SyncRecordModel -import kotlinx.coroutines.async import kotlinx.coroutines.flow.map import kotlinx.coroutines.flow.toList import kotlinx.coroutines.reactive.awaitFirst import kotlinx.coroutines.reactive.awaitSingleOrNull -import kotlinx.coroutines.runBlocking import org.springframework.stereotype.Component import org.springframework.transaction.annotation.Transactional From ba26622e94a86adb4beebf2e46a6ef3ad3171ad3 Mon Sep 17 00:00:00 2001 From: metalicn20 Date: Sun, 12 Sep 2021 14:48:15 +0430 Subject: [PATCH 10/24] Implement saveReadyToSyncTransfers --- .../postgres/impl/SyncRecordHandlerImpl.kt | 5 ++-- .../impl/WalletSyncRecordHandlerImpl.kt | 25 +++++++++++++++---- 2 files changed, 22 insertions(+), 8 deletions(-) diff --git a/BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/impl/SyncRecordHandlerImpl.kt b/BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/impl/SyncRecordHandlerImpl.kt index dac616392..319b8e4c0 100644 --- a/BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/impl/SyncRecordHandlerImpl.kt +++ b/BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/impl/SyncRecordHandlerImpl.kt @@ -22,7 +22,6 @@ class SyncRecordHandlerImpl( ) : SyncRecordHandler { override suspend fun loadLastSuccessRecord(chainName: String): ChainSyncRecord? { val chainSyncRecordDao = chainSyncRecordRepository.findByChain(chainName).awaitSingleOrNull() - return if (chainSyncRecordDao !== null) { val deposits = depositRepository.findByChain(chainName).map { Deposit(it.depositor, it.depositorMemo, it.amount, it.chain, it.token, it.tokenAddress) @@ -53,7 +52,7 @@ class SyncRecordHandlerImpl( syncRecord.error ) chainSyncRecordRepository.save(chainSyncRecordDao).awaitFirst() - val deposits = syncRecord.records.map { + val depositsDao = syncRecord.records.map { DepositModel( null, it.depositor, @@ -64,6 +63,6 @@ class SyncRecordHandlerImpl( it.tokenAddress ) } - depositRepository.saveAll(deposits).awaitFirst() + depositRepository.saveAll(depositsDao).awaitFirst() } } \ No newline at end of file diff --git a/BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/impl/WalletSyncRecordHandlerImpl.kt b/BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/impl/WalletSyncRecordHandlerImpl.kt index 1d10ce26f..3addb0f6f 100644 --- a/BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/impl/WalletSyncRecordHandlerImpl.kt +++ b/BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/impl/WalletSyncRecordHandlerImpl.kt @@ -1,16 +1,31 @@ package co.nilin.opex.port.bcgateway.postgres.impl import co.nilin.opex.bcgateway.core.model.Deposit -import co.nilin.opex.bcgateway.core.model.WalletSyncSchedule import co.nilin.opex.bcgateway.core.spi.WalletSyncRecordHandler -import co.nilin.opex.bcgateway.core.spi.WalletSyncSchedulerHandler +import co.nilin.opex.port.bcgateway.postgres.dao.DepositRepository +import co.nilin.opex.port.bcgateway.postgres.model.DepositModel +import kotlinx.coroutines.reactive.awaitFirst import org.springframework.stereotype.Component -import java.time.LocalDateTime +import org.springframework.transaction.annotation.Transactional @Component -class WalletSyncRecordHandlerImpl : WalletSyncRecordHandler { +class WalletSyncRecordHandlerImpl( + private val depositRepository: DepositRepository +) : WalletSyncRecordHandler { + @Transactional override suspend fun saveReadyToSyncTransfers(chainName: String, deposits: List) { - TODO("Not yet implemented") + val depositsDao = deposits.map { + DepositModel( + null, + it.depositor, + it.depositorMemo, + it.amount, + it.chain, + it.token, + it.tokenAddress + ) + } + depositRepository.saveAll(depositsDao).awaitFirst() } override suspend fun findReadyToSyncTransfers(count: Long?): List { From 91c9cb3327771d4f69093333e57818ab962aead9 Mon Sep 17 00:00:00 2001 From: metalicn20 Date: Sun, 12 Sep 2021 15:16:40 +0430 Subject: [PATCH 11/24] Fix SyncModel table names --- .../co/nilin/opex/port/bcgateway/postgres/model/SyncModel.kt | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/model/SyncModel.kt b/BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/model/SyncModel.kt index 161b6af57..d7093e80a 100644 --- a/BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/model/SyncModel.kt +++ b/BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/model/SyncModel.kt @@ -5,12 +5,12 @@ import org.springframework.data.relational.core.mapping.Column import org.springframework.data.relational.core.mapping.Table import java.time.LocalDateTime -@Table("chain_sync_schedule") +@Table("chain_sync_schedules") data class SyncScheduleModel( @Id @Column("chain") val chain: String, @Column("retry_time") val retryTime: LocalDateTime, val delay: Long ) -@Table("chain_sync_record") +@Table("chain_sync_records") data class SyncRecordModel( @Id @Column("chain") val chain: String, val time: LocalDateTime, From 23d36cc4f2e1e3bdc76080ae15acb7a3fad4855a Mon Sep 17 00:00:00 2001 From: metalicn20 Date: Sun, 12 Sep 2021 15:31:25 +0430 Subject: [PATCH 12/24] Add wallet sync schedule table and repository --- .../co/nilin/opex/bcgateway/core/model/WalletSync.kt | 5 +---- .../port/bcgateway/postgres/config/PostgresConfig.kt | 6 ++++++ .../postgres/dao/WalletSyncScheduleRepository.kt | 8 ++++++++ .../port/bcgateway/postgres/model/WalletSyncModel.kt | 10 ++++++++++ 4 files changed, 25 insertions(+), 4 deletions(-) create mode 100644 BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/dao/WalletSyncScheduleRepository.kt create mode 100644 BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/model/WalletSyncModel.kt diff --git a/BlockchainGateway/bc-gateway-core/src/main/kotlin/co/nilin/opex/bcgateway/core/model/WalletSync.kt b/BlockchainGateway/bc-gateway-core/src/main/kotlin/co/nilin/opex/bcgateway/core/model/WalletSync.kt index 3f4ec00fa..81fc4115d 100644 --- a/BlockchainGateway/bc-gateway-core/src/main/kotlin/co/nilin/opex/bcgateway/core/model/WalletSync.kt +++ b/BlockchainGateway/bc-gateway-core/src/main/kotlin/co/nilin/opex/bcgateway/core/model/WalletSync.kt @@ -4,8 +4,5 @@ import java.time.LocalDateTime data class WalletSyncSchedule(val retryTime: LocalDateTime, val delay: Long, val batchSize: Long?) data class WalletSyncRecord( - val time: LocalDateTime - , val success: Boolean - , val error: String? - , val deposit: Deposit + val time: LocalDateTime, val success: Boolean, val error: String?, val deposit: Deposit ) \ No newline at end of file diff --git a/BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/config/PostgresConfig.kt b/BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/config/PostgresConfig.kt index 0804207c9..0edc70dbe 100644 --- a/BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/config/PostgresConfig.kt +++ b/BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/config/PostgresConfig.kt @@ -79,6 +79,12 @@ class PostgresConfig(db: DatabaseClient) { withdraw_fee NUMERIC NOT NULL, withdraw_min NUMERIC NOT NULL ); + CREATE TABLE IF NOT EXISTS wallet_sync_schedules ( + id SERIAL PRIMARY KEY, + retry_time TIMESTAMP NOT NULL, + delay NUMERIC NOT NULL, + batch_size INTEGER + ); """ } initDb // initialize the database diff --git a/BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/dao/WalletSyncScheduleRepository.kt b/BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/dao/WalletSyncScheduleRepository.kt new file mode 100644 index 000000000..9c7e282d5 --- /dev/null +++ b/BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/dao/WalletSyncScheduleRepository.kt @@ -0,0 +1,8 @@ +package co.nilin.opex.port.bcgateway.postgres.dao + +import co.nilin.opex.port.bcgateway.postgres.model.WalletSyncScheduleModel +import org.springframework.data.repository.reactive.ReactiveCrudRepository +import org.springframework.stereotype.Repository + +@Repository +interface WalletSyncScheduleRepository : ReactiveCrudRepository \ No newline at end of file diff --git a/BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/model/WalletSyncModel.kt b/BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/model/WalletSyncModel.kt new file mode 100644 index 000000000..9d97bc548 --- /dev/null +++ b/BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/model/WalletSyncModel.kt @@ -0,0 +1,10 @@ +package co.nilin.opex.port.bcgateway.postgres.model + +import org.springframework.data.annotation.Id +import org.springframework.data.relational.core.mapping.Table +import java.time.LocalDateTime + +@Table("wallet_sync_schedules") +data class WalletSyncScheduleModel( + @Id val id: Long?, val retryTime: LocalDateTime, val delay: Long, val batchSize: Long? +) From 3725299536c55aafc728733c19cb911b575fe229 Mon Sep 17 00:00:00 2001 From: metalicn20 Date: Sun, 12 Sep 2021 15:34:37 +0430 Subject: [PATCH 13/24] Improve column types --- .../opex/port/bcgateway/postgres/config/PostgresConfig.kt | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/config/PostgresConfig.kt b/BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/config/PostgresConfig.kt index 0edc70dbe..a7bf7cb24 100644 --- a/BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/config/PostgresConfig.kt +++ b/BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/config/PostgresConfig.kt @@ -54,8 +54,8 @@ class PostgresConfig(db: DatabaseClient) { ); CREATE TABLE IF NOT EXISTS chain_sync_schedules ( chain VARCHAR(72) PRIMARY KEY, - retry_time TIMESTAMP, - delay NUMERIC + retry_time TIMESTAMP NOT NULL, + delay INTEGER NOT NULL ); CREATE TABLE IF NOT EXISTS chain_sync_records ( chain VARCHAR(72) PRIMARY KEY, @@ -82,7 +82,7 @@ class PostgresConfig(db: DatabaseClient) { CREATE TABLE IF NOT EXISTS wallet_sync_schedules ( id SERIAL PRIMARY KEY, retry_time TIMESTAMP NOT NULL, - delay NUMERIC NOT NULL, + delay INTEGER NOT NULL, batch_size INTEGER ); """ From 83c76c0fdee9756db9b146d936d5f4c00b5e5940 Mon Sep 17 00:00:00 2001 From: metalicn20 Date: Tue, 14 Sep 2021 12:24:20 +0430 Subject: [PATCH 14/24] Update wallet sync record --- .../main/kotlin/co/nilin/opex/bcgateway/core/model/Chain.kt | 2 -- .../kotlin/co/nilin/opex/bcgateway/core/model/WalletSync.kt | 4 ++-- 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/BlockchainGateway/bc-gateway-core/src/main/kotlin/co/nilin/opex/bcgateway/core/model/Chain.kt b/BlockchainGateway/bc-gateway-core/src/main/kotlin/co/nilin/opex/bcgateway/core/model/Chain.kt index c591687f8..d88430445 100644 --- a/BlockchainGateway/bc-gateway-core/src/main/kotlin/co/nilin/opex/bcgateway/core/model/Chain.kt +++ b/BlockchainGateway/bc-gateway-core/src/main/kotlin/co/nilin/opex/bcgateway/core/model/Chain.kt @@ -1,6 +1,5 @@ package co.nilin.opex.bcgateway.core.model -import java.math.BigDecimal import java.time.LocalDateTime data class Endpoint(val url: String) @@ -15,4 +14,3 @@ data class ChainSyncRecord( val error: String?, val records: List ) - diff --git a/BlockchainGateway/bc-gateway-core/src/main/kotlin/co/nilin/opex/bcgateway/core/model/WalletSync.kt b/BlockchainGateway/bc-gateway-core/src/main/kotlin/co/nilin/opex/bcgateway/core/model/WalletSync.kt index 81fc4115d..fd4eb056c 100644 --- a/BlockchainGateway/bc-gateway-core/src/main/kotlin/co/nilin/opex/bcgateway/core/model/WalletSync.kt +++ b/BlockchainGateway/bc-gateway-core/src/main/kotlin/co/nilin/opex/bcgateway/core/model/WalletSync.kt @@ -4,5 +4,5 @@ import java.time.LocalDateTime data class WalletSyncSchedule(val retryTime: LocalDateTime, val delay: Long, val batchSize: Long?) data class WalletSyncRecord( - val time: LocalDateTime, val success: Boolean, val error: String?, val deposit: Deposit -) \ No newline at end of file + val time: LocalDateTime, val success: Boolean, val error: String?, val deposit: List +) From 22d31dd43adac0c6a14e5da843789ebf46fece83 Mon Sep 17 00:00:00 2001 From: metalicn20 Date: Tue, 14 Sep 2021 12:31:51 +0430 Subject: [PATCH 15/24] Update sync record names --- .../opex/bcgateway/app/config/AppConfig.kt | 10 +++--- .../core/service/ChainSyncServiceImpl.kt | 14 ++++----- ...rdHandler.kt => ChainSyncRecordHandler.kt} | 4 +-- ...andler.kt => ChainSyncSchedulerHandler.kt} | 4 +-- .../core/service/ChainSyncServiceImplTest.kt | 31 +++++++++---------- .../postgres/dao/ChainSyncRecordRepository.kt | 8 ++--- .../dao/ChainSyncScheduleRepository.kt | 9 ++---- ...rImpl.kt => ChainSyncRecordHandlerImpl.kt} | 12 +++---- ...pl.kt => ChainSyncSchedulerHandlerImpl.kt} | 6 ++-- .../model/{SyncModel.kt => ChainSyncModel.kt} | 6 ++-- 10 files changed, 48 insertions(+), 56 deletions(-) rename BlockchainGateway/bc-gateway-core/src/main/kotlin/co/nilin/opex/bcgateway/core/spi/{SyncRecordHandler.kt => ChainSyncRecordHandler.kt} (86%) rename BlockchainGateway/bc-gateway-core/src/main/kotlin/co/nilin/opex/bcgateway/core/spi/{SyncSchedulerHandler.kt => ChainSyncSchedulerHandler.kt} (88%) rename BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/impl/{SyncRecordHandlerImpl.kt => ChainSyncRecordHandlerImpl.kt} (91%) rename BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/impl/{SyncSchedulerHandlerImpl.kt => ChainSyncSchedulerHandlerImpl.kt} (78%) rename BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/model/{SyncModel.kt => ChainSyncModel.kt} (90%) diff --git a/BlockchainGateway/bc-gateway-app/src/main/kotlin/co/nilin/opex/bcgateway/app/config/AppConfig.kt b/BlockchainGateway/bc-gateway-app/src/main/kotlin/co/nilin/opex/bcgateway/app/config/AppConfig.kt index 58cc98ade..831ed2b07 100644 --- a/BlockchainGateway/bc-gateway-app/src/main/kotlin/co/nilin/opex/bcgateway/app/config/AppConfig.kt +++ b/BlockchainGateway/bc-gateway-app/src/main/kotlin/co/nilin/opex/bcgateway/app/config/AppConfig.kt @@ -28,17 +28,17 @@ class AppConfig { @Bean fun chainSyncService( - syncSchedulerHandler: SyncSchedulerHandler, + chainSyncSchedulerHandler: ChainSyncSchedulerHandler, chainEndpointProxyFinder: ChainEndpointProxyFinder, - syncRecordHandler: SyncRecordHandler, + chainSyncRecordHandler: ChainSyncRecordHandler, walletSyncRecordHandler: WalletSyncRecordHandler, currencyLoader: CurrencyLoader, operator: TransactionalOperator ): ChainSyncService { return ChainSyncServiceImpl( - syncSchedulerHandler, + chainSyncSchedulerHandler, chainEndpointProxyFinder, - syncRecordHandler, + chainSyncRecordHandler, walletSyncRecordHandler, currencyLoader, operator, @@ -61,4 +61,4 @@ class AppConfig { fun infoService(): InfoService { return InfoServiceImpl() } -} \ No newline at end of file +} diff --git a/BlockchainGateway/bc-gateway-core/src/main/kotlin/co/nilin/opex/bcgateway/core/service/ChainSyncServiceImpl.kt b/BlockchainGateway/bc-gateway-core/src/main/kotlin/co/nilin/opex/bcgateway/core/service/ChainSyncServiceImpl.kt index 9bfb97219..f8f8f4144 100644 --- a/BlockchainGateway/bc-gateway-core/src/main/kotlin/co/nilin/opex/bcgateway/core/service/ChainSyncServiceImpl.kt +++ b/BlockchainGateway/bc-gateway-core/src/main/kotlin/co/nilin/opex/bcgateway/core/service/ChainSyncServiceImpl.kt @@ -10,9 +10,9 @@ import java.time.temporal.ChronoUnit import kotlin.coroutines.coroutineContext open class ChainSyncServiceImpl( - private val syncSchedulerHandler: SyncSchedulerHandler, + private val chainSyncSchedulerHandler: ChainSyncSchedulerHandler, private val chainEndpointProxyFinder: ChainEndpointProxyFinder, - private val syncRecordHandler: SyncRecordHandler, + private val chainSyncRecordHandler: ChainSyncRecordHandler, private val walletSyncRecordHandler: WalletSyncRecordHandler, private val currencyLoader: CurrencyLoader, private val operator: TransactionalOperator, @@ -21,11 +21,11 @@ open class ChainSyncServiceImpl( override suspend fun startSyncWithChain() { withContext(coroutineContext) { - val schedules = syncSchedulerHandler.fetchActiveSchedules(currentTime()) + val schedules = chainSyncSchedulerHandler.fetchActiveSchedules(currentTime()) schedules.map { syncSchedule -> async(dispatcher) { val syncHandler = chainEndpointProxyFinder.findChainEndpointProxy(syncSchedule.chainName) - val lastSync = syncRecordHandler.loadLastSuccessRecord(syncSchedule.chainName) + val lastSync = chainSyncRecordHandler.loadLastSuccessRecord(syncSchedule.chainName) val tokens = currencyLoader.findImplementationsWithTokenOnChain(syncSchedule.chainName) .map { impl -> impl.tokenAddress!! } .toList() @@ -37,9 +37,9 @@ open class ChainSyncServiceImpl( ) operator.executeAndAwait { walletSyncRecordHandler.saveReadyToSyncTransfers(syncResult.chainName, syncResult.records) - syncRecordHandler.saveSyncRecord(syncResult) + chainSyncRecordHandler.saveSyncRecord(syncResult) if (syncResult.success) { - syncSchedulerHandler.prepareScheduleForNextTry( + chainSyncSchedulerHandler.prepareScheduleForNextTry( syncSchedule, currentTime().plus(syncSchedule.delay, ChronoUnit.SECONDS) ) @@ -51,4 +51,4 @@ open class ChainSyncServiceImpl( } protected open fun currentTime() = LocalDateTime.now() -} \ No newline at end of file +} diff --git a/BlockchainGateway/bc-gateway-core/src/main/kotlin/co/nilin/opex/bcgateway/core/spi/SyncRecordHandler.kt b/BlockchainGateway/bc-gateway-core/src/main/kotlin/co/nilin/opex/bcgateway/core/spi/ChainSyncRecordHandler.kt similarity index 86% rename from BlockchainGateway/bc-gateway-core/src/main/kotlin/co/nilin/opex/bcgateway/core/spi/SyncRecordHandler.kt rename to BlockchainGateway/bc-gateway-core/src/main/kotlin/co/nilin/opex/bcgateway/core/spi/ChainSyncRecordHandler.kt index d7803c28a..e8cc7106a 100644 --- a/BlockchainGateway/bc-gateway-core/src/main/kotlin/co/nilin/opex/bcgateway/core/spi/SyncRecordHandler.kt +++ b/BlockchainGateway/bc-gateway-core/src/main/kotlin/co/nilin/opex/bcgateway/core/spi/ChainSyncRecordHandler.kt @@ -2,7 +2,7 @@ package co.nilin.opex.bcgateway.core.spi import co.nilin.opex.bcgateway.core.model.ChainSyncRecord -interface SyncRecordHandler { +interface ChainSyncRecordHandler { suspend fun loadLastSuccessRecord(chainName: String): ChainSyncRecord? suspend fun saveSyncRecord(syncRecord: ChainSyncRecord) -} \ No newline at end of file +} diff --git a/BlockchainGateway/bc-gateway-core/src/main/kotlin/co/nilin/opex/bcgateway/core/spi/SyncSchedulerHandler.kt b/BlockchainGateway/bc-gateway-core/src/main/kotlin/co/nilin/opex/bcgateway/core/spi/ChainSyncSchedulerHandler.kt similarity index 88% rename from BlockchainGateway/bc-gateway-core/src/main/kotlin/co/nilin/opex/bcgateway/core/spi/SyncSchedulerHandler.kt rename to BlockchainGateway/bc-gateway-core/src/main/kotlin/co/nilin/opex/bcgateway/core/spi/ChainSyncSchedulerHandler.kt index be763513e..a04e498af 100644 --- a/BlockchainGateway/bc-gateway-core/src/main/kotlin/co/nilin/opex/bcgateway/core/spi/SyncSchedulerHandler.kt +++ b/BlockchainGateway/bc-gateway-core/src/main/kotlin/co/nilin/opex/bcgateway/core/spi/ChainSyncSchedulerHandler.kt @@ -3,7 +3,7 @@ package co.nilin.opex.bcgateway.core.spi import co.nilin.opex.bcgateway.core.model.ChainSyncSchedule import java.time.LocalDateTime -interface SyncSchedulerHandler { +interface ChainSyncSchedulerHandler { suspend fun fetchActiveSchedules(time: LocalDateTime): List suspend fun prepareScheduleForNextTry(syncSchedule: ChainSyncSchedule, time: LocalDateTime) -} \ No newline at end of file +} diff --git a/BlockchainGateway/bc-gateway-core/src/test/kotlin/co/nilin/opex/bcgateway/core/service/ChainSyncServiceImplTest.kt b/BlockchainGateway/bc-gateway-core/src/test/kotlin/co/nilin/opex/bcgateway/core/service/ChainSyncServiceImplTest.kt index 75115b8f3..f9997357e 100644 --- a/BlockchainGateway/bc-gateway-core/src/test/kotlin/co/nilin/opex/bcgateway/core/service/ChainSyncServiceImplTest.kt +++ b/BlockchainGateway/bc-gateway-core/src/test/kotlin/co/nilin/opex/bcgateway/core/service/ChainSyncServiceImplTest.kt @@ -6,8 +6,8 @@ import co.nilin.opex.bcgateway.core.model.Endpoint import co.nilin.opex.bcgateway.core.spi.ChainEndpointProxy import co.nilin.opex.bcgateway.core.spi.ChainEndpointProxyFinder import co.nilin.opex.bcgateway.core.spi.CurrencyLoader -import co.nilin.opex.bcgateway.core.spi.SyncRecordHandler -import co.nilin.opex.bcgateway.core.spi.SyncSchedulerHandler +import co.nilin.opex.bcgateway.core.spi.ChainSyncRecordHandler +import co.nilin.opex.bcgateway.core.spi.ChainSyncSchedulerHandler import co.nilin.opex.bcgateway.core.spi.WalletSyncRecordHandler import co.nilin.opex.bcgateway.test.OPERATOR import java.time.LocalDateTime @@ -23,7 +23,6 @@ import org.mockito.kotlin.any import org.mockito.kotlin.mock import org.mockito.kotlin.times import org.mockito.kotlin.verify -import org.mockito.kotlin.verifyNoMoreInteractions import org.mockito.kotlin.verifyZeroInteractions internal class ChainSyncServiceImplTest { @@ -34,13 +33,13 @@ internal class ChainSyncServiceImplTest { val syncService: ChainSyncServiceImpl @Mock - lateinit var syncSchedulerHandler: SyncSchedulerHandler + lateinit var chainSyncSchedulerHandler: ChainSyncSchedulerHandler @Mock lateinit var chainEndpointProxyFinder: ChainEndpointProxyFinder @Mock - lateinit var syncRecordHandler: SyncRecordHandler + lateinit var chainSyncRecordHandler: ChainSyncRecordHandler @Mock lateinit var walletSyncRecordHandler: WalletSyncRecordHandler @@ -59,9 +58,9 @@ internal class ChainSyncServiceImplTest { } syncService = object : ChainSyncServiceImpl( - syncSchedulerHandler, + chainSyncSchedulerHandler, chainEndpointProxyFinder, - syncRecordHandler, + chainSyncRecordHandler, walletSyncRecordHandler, currencyLoader, OPERATOR, @@ -75,7 +74,7 @@ internal class ChainSyncServiceImplTest { fun givenNoActiveSchedules_whenStartSync_thenNoOp() { runBlocking { //given - Mockito.`when`(syncSchedulerHandler.fetchActiveSchedules(any())).thenReturn(emptyList()) + Mockito.`when`(chainSyncSchedulerHandler.fetchActiveSchedules(any())).thenReturn(emptyList()) //when syncService.startSyncWithChain() @@ -83,7 +82,7 @@ internal class ChainSyncServiceImplTest { //then verifyZeroInteractions( chainEndpointProxyFinder, - syncRecordHandler, + chainSyncRecordHandler, walletSyncRecordHandler, currencyLoader ) @@ -96,7 +95,7 @@ internal class ChainSyncServiceImplTest { //given val delay = 100L val syncSchedule = ChainSyncSchedule(ethChain, time, delay) - Mockito.`when`(syncSchedulerHandler.fetchActiveSchedules(any())) + Mockito.`when`(chainSyncSchedulerHandler.fetchActiveSchedules(any())) .thenReturn(listOf(syncSchedule)) Mockito.`when`(endpointProxy.syncTransfers(any())).thenReturn( ChainSyncRecord( @@ -108,9 +107,9 @@ internal class ChainSyncServiceImplTest { syncService.startSyncWithChain() //then - verify(syncRecordHandler).saveSyncRecord(any()) + verify(chainSyncRecordHandler).saveSyncRecord(any()) verify(walletSyncRecordHandler).saveReadyToSyncTransfers(any(), any()) - verify(syncSchedulerHandler).prepareScheduleForNextTry(syncSchedule, time.plus(delay, ChronoUnit.SECONDS)) + verify(chainSyncSchedulerHandler).prepareScheduleForNextTry(syncSchedule, time.plus(delay, ChronoUnit.SECONDS)) } } @@ -120,7 +119,7 @@ internal class ChainSyncServiceImplTest { //given val delay = 100L val syncSchedule = ChainSyncSchedule(ethChain, time, delay) - Mockito.`when`(syncSchedulerHandler.fetchActiveSchedules(any())) + Mockito.`when`(chainSyncSchedulerHandler.fetchActiveSchedules(any())) .thenReturn(listOf(syncSchedule)) Mockito.`when`(endpointProxy.syncTransfers(any())).thenReturn( ChainSyncRecord( @@ -132,11 +131,11 @@ internal class ChainSyncServiceImplTest { syncService.startSyncWithChain() //then - verify(syncRecordHandler).saveSyncRecord(any()) + verify(chainSyncRecordHandler).saveSyncRecord(any()) verify(walletSyncRecordHandler).saveReadyToSyncTransfers(any(), any()) - verify(syncSchedulerHandler, times(0)).prepareScheduleForNextTry(any(), any()) + verify(chainSyncSchedulerHandler, times(0)).prepareScheduleForNextTry(any(), any()) } } -} \ No newline at end of file +} diff --git a/BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/dao/ChainSyncRecordRepository.kt b/BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/dao/ChainSyncRecordRepository.kt index b0fa51f46..20044463e 100644 --- a/BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/dao/ChainSyncRecordRepository.kt +++ b/BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/dao/ChainSyncRecordRepository.kt @@ -1,11 +1,11 @@ package co.nilin.opex.port.bcgateway.postgres.dao -import co.nilin.opex.port.bcgateway.postgres.model.SyncRecordModel +import co.nilin.opex.port.bcgateway.postgres.model.ChainSyncRecordModel import org.springframework.data.repository.reactive.ReactiveCrudRepository import org.springframework.stereotype.Repository import reactor.core.publisher.Mono @Repository -interface ChainSyncRecordRepository : ReactiveCrudRepository { - fun findByChain(chain: String): Mono -} \ No newline at end of file +interface ChainSyncRecordRepository : ReactiveCrudRepository { + fun findByChain(chain: String): Mono +} diff --git a/BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/dao/ChainSyncScheduleRepository.kt b/BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/dao/ChainSyncScheduleRepository.kt index 48a7a0252..213ecb5b2 100644 --- a/BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/dao/ChainSyncScheduleRepository.kt +++ b/BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/dao/ChainSyncScheduleRepository.kt @@ -1,13 +1,8 @@ package co.nilin.opex.port.bcgateway.postgres.dao -import co.nilin.opex.port.bcgateway.postgres.model.AssignedAddressChainModel -import co.nilin.opex.port.bcgateway.postgres.model.ChainModel -import co.nilin.opex.port.bcgateway.postgres.model.SyncScheduleModel -import kotlinx.coroutines.flow.Flow -import org.springframework.data.r2dbc.repository.Query -import org.springframework.data.repository.query.Param +import co.nilin.opex.port.bcgateway.postgres.model.ChainSyncScheduleModel import org.springframework.data.repository.reactive.ReactiveCrudRepository import org.springframework.stereotype.Repository @Repository -interface ChainSyncScheduleRepository : ReactiveCrudRepository \ No newline at end of file +interface ChainSyncScheduleRepository : ReactiveCrudRepository diff --git a/BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/impl/SyncRecordHandlerImpl.kt b/BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/impl/ChainSyncRecordHandlerImpl.kt similarity index 91% rename from BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/impl/SyncRecordHandlerImpl.kt rename to BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/impl/ChainSyncRecordHandlerImpl.kt index 319b8e4c0..e77119225 100644 --- a/BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/impl/SyncRecordHandlerImpl.kt +++ b/BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/impl/ChainSyncRecordHandlerImpl.kt @@ -3,11 +3,11 @@ package co.nilin.opex.port.bcgateway.postgres.impl import co.nilin.opex.bcgateway.core.model.ChainSyncRecord import co.nilin.opex.bcgateway.core.model.Deposit import co.nilin.opex.bcgateway.core.model.Endpoint -import co.nilin.opex.bcgateway.core.spi.SyncRecordHandler +import co.nilin.opex.bcgateway.core.spi.ChainSyncRecordHandler import co.nilin.opex.port.bcgateway.postgres.dao.ChainSyncRecordRepository import co.nilin.opex.port.bcgateway.postgres.dao.DepositRepository import co.nilin.opex.port.bcgateway.postgres.model.DepositModel -import co.nilin.opex.port.bcgateway.postgres.model.SyncRecordModel +import co.nilin.opex.port.bcgateway.postgres.model.ChainSyncRecordModel import kotlinx.coroutines.flow.map import kotlinx.coroutines.flow.toList import kotlinx.coroutines.reactive.awaitFirst @@ -16,10 +16,10 @@ import org.springframework.stereotype.Component import org.springframework.transaction.annotation.Transactional @Component -class SyncRecordHandlerImpl( +class ChainSyncRecordHandlerImpl( private val chainSyncRecordRepository: ChainSyncRecordRepository, private val depositRepository: DepositRepository -) : SyncRecordHandler { +) : ChainSyncRecordHandler { override suspend fun loadLastSuccessRecord(chainName: String): ChainSyncRecord? { val chainSyncRecordDao = chainSyncRecordRepository.findByChain(chainName).awaitSingleOrNull() return if (chainSyncRecordDao !== null) { @@ -43,7 +43,7 @@ class SyncRecordHandlerImpl( @Transactional override suspend fun saveSyncRecord(syncRecord: ChainSyncRecord) { val chainSyncRecordDao = - SyncRecordModel( + ChainSyncRecordModel( syncRecord.chainName, syncRecord.time, syncRecord.endpoint.url, @@ -65,4 +65,4 @@ class SyncRecordHandlerImpl( } depositRepository.saveAll(depositsDao).awaitFirst() } -} \ No newline at end of file +} diff --git a/BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/impl/SyncSchedulerHandlerImpl.kt b/BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/impl/ChainSyncSchedulerHandlerImpl.kt similarity index 78% rename from BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/impl/SyncSchedulerHandlerImpl.kt rename to BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/impl/ChainSyncSchedulerHandlerImpl.kt index 89c6e399b..c9a9f34a7 100644 --- a/BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/impl/SyncSchedulerHandlerImpl.kt +++ b/BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/impl/ChainSyncSchedulerHandlerImpl.kt @@ -1,12 +1,12 @@ package co.nilin.opex.port.bcgateway.postgres.impl import co.nilin.opex.bcgateway.core.model.ChainSyncSchedule -import co.nilin.opex.bcgateway.core.spi.SyncSchedulerHandler +import co.nilin.opex.bcgateway.core.spi.ChainSyncSchedulerHandler import org.springframework.stereotype.Component import java.time.LocalDateTime @Component -class SyncSchedulerHandlerImpl: SyncSchedulerHandler { +class ChainSyncSchedulerHandlerImpl: ChainSyncSchedulerHandler { override suspend fun fetchActiveSchedules(time: LocalDateTime): List { TODO("Not yet implemented") } @@ -14,4 +14,4 @@ class SyncSchedulerHandlerImpl: SyncSchedulerHandler { override suspend fun prepareScheduleForNextTry(syncSchedule: ChainSyncSchedule, time: LocalDateTime) { TODO("Not yet implemented") } -} \ No newline at end of file +} diff --git a/BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/model/SyncModel.kt b/BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/model/ChainSyncModel.kt similarity index 90% rename from BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/model/SyncModel.kt rename to BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/model/ChainSyncModel.kt index d7093e80a..c77b3da05 100644 --- a/BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/model/SyncModel.kt +++ b/BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/model/ChainSyncModel.kt @@ -6,12 +6,12 @@ import org.springframework.data.relational.core.mapping.Table import java.time.LocalDateTime @Table("chain_sync_schedules") -data class SyncScheduleModel( +data class ChainSyncScheduleModel( @Id @Column("chain") val chain: String, @Column("retry_time") val retryTime: LocalDateTime, val delay: Long ) @Table("chain_sync_records") -data class SyncRecordModel( +data class ChainSyncRecordModel( @Id @Column("chain") val chain: String, val time: LocalDateTime, @Column("endpoint_url") val endpointUrl: String, @@ -19,5 +19,3 @@ data class SyncRecordModel( val success: Boolean, val error: String? ) - - From fc54dba862f32a820c0ed748f1410ada61eb9c4e Mon Sep 17 00:00:00 2001 From: metalicn20 Date: Tue, 14 Sep 2021 13:33:27 +0430 Subject: [PATCH 16/24] Separate chain sync and wallet sync deposits --- .../postgres/config/PostgresConfig.kt | 21 ++++++++++++++++++- .../dao/ChainSyncDepositRepository.kt | 11 ++++++++++ .../postgres/dao/DepositRepository.kt | 11 ---------- .../impl/ChainSyncRecordHandlerImpl.kt | 12 +++++------ .../impl/WalletSyncRecordHandlerImpl.kt | 12 +++++------ ...positModel.kt => ChainSyncDepositModel.kt} | 6 +++--- .../postgres/model/WalletSyncDepositModel.kt | 17 +++++++++++++++ .../postgres/model/WalletSyncModel.kt | 12 +++++++++++ 8 files changed, 75 insertions(+), 27 deletions(-) create mode 100644 BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/dao/ChainSyncDepositRepository.kt delete mode 100644 BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/dao/DepositRepository.kt rename BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/model/{DepositModel.kt => ChainSyncDepositModel.kt} (86%) create mode 100644 BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/model/WalletSyncDepositModel.kt diff --git a/BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/config/PostgresConfig.kt b/BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/config/PostgresConfig.kt index 3f92ee992..057f1707e 100644 --- a/BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/config/PostgresConfig.kt +++ b/BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/config/PostgresConfig.kt @@ -79,7 +79,7 @@ class PostgresConfig(db: DatabaseClient) { withdraw_fee NUMERIC NOT NULL, withdraw_min NUMERIC NOT NULL ); - CREATE TABLE IF NOT EXISTS deposits ( + CREATE TABLE IF NOT EXISTS chain_sync_deposits ( id SERIAL PRIMARY KEY, chain VARCHAR(72), token BOOLEAN NOT NULL, @@ -88,12 +88,31 @@ class PostgresConfig(db: DatabaseClient) { depositor VARCHAR(72) NOT NULL, depositorMemo VARCHAR(72) ); + CREATE TABLE IF NOT EXISTS wallet_sync_deposits ( + id SERIAL PRIMARY KEY, + wallet_sync_record INTEGER NOT NULL, + chain VARCHAR(72), + token BOOLEAN NOT NULL, + token_address VARCHAR(72), + amount NUMERIC NOT NULL, + depositor VARCHAR(72) NOT NULL, + depositorMemo VARCHAR(72) + ); CREATE TABLE IF NOT EXISTS wallet_sync_schedules ( id SERIAL PRIMARY KEY, retry_time TIMESTAMP NOT NULL, delay INTEGER NOT NULL, batch_size INTEGER ); + CREATE TABLE IF NOT EXISTS wallet_sync_records ( + id SERIAL PRIMARY KEY, + chain VARCHAR(72), + time TIMESTAMP NOT NULL, + endpoint_url VARCHAR(72) NOT NULL, + latest_block INTEGER, + success BOOLEAN NOT NULL, + error VARCHAR(100) + ); """ } initDb // initialize the database diff --git a/BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/dao/ChainSyncDepositRepository.kt b/BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/dao/ChainSyncDepositRepository.kt new file mode 100644 index 000000000..04d5e1237 --- /dev/null +++ b/BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/dao/ChainSyncDepositRepository.kt @@ -0,0 +1,11 @@ +package co.nilin.opex.port.bcgateway.postgres.dao + +import co.nilin.opex.port.bcgateway.postgres.model.ChainSyncDepositModel +import kotlinx.coroutines.flow.Flow +import org.springframework.data.repository.reactive.ReactiveCrudRepository +import org.springframework.stereotype.Repository + +@Repository +interface ChainSyncDepositRepository : ReactiveCrudRepository { + fun findByChain(chain: String): Flow +} diff --git a/BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/dao/DepositRepository.kt b/BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/dao/DepositRepository.kt deleted file mode 100644 index f5e21069b..000000000 --- a/BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/dao/DepositRepository.kt +++ /dev/null @@ -1,11 +0,0 @@ -package co.nilin.opex.port.bcgateway.postgres.dao - -import co.nilin.opex.port.bcgateway.postgres.model.DepositModel -import kotlinx.coroutines.flow.Flow -import org.springframework.data.repository.reactive.ReactiveCrudRepository -import org.springframework.stereotype.Repository - -@Repository -interface DepositRepository : ReactiveCrudRepository { - fun findByChain(chain: String): Flow -} \ No newline at end of file diff --git a/BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/impl/ChainSyncRecordHandlerImpl.kt b/BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/impl/ChainSyncRecordHandlerImpl.kt index e77119225..37bfb4571 100644 --- a/BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/impl/ChainSyncRecordHandlerImpl.kt +++ b/BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/impl/ChainSyncRecordHandlerImpl.kt @@ -5,8 +5,8 @@ import co.nilin.opex.bcgateway.core.model.Deposit import co.nilin.opex.bcgateway.core.model.Endpoint import co.nilin.opex.bcgateway.core.spi.ChainSyncRecordHandler import co.nilin.opex.port.bcgateway.postgres.dao.ChainSyncRecordRepository -import co.nilin.opex.port.bcgateway.postgres.dao.DepositRepository -import co.nilin.opex.port.bcgateway.postgres.model.DepositModel +import co.nilin.opex.port.bcgateway.postgres.dao.ChainSyncDepositRepository +import co.nilin.opex.port.bcgateway.postgres.model.ChainSyncDepositModel import co.nilin.opex.port.bcgateway.postgres.model.ChainSyncRecordModel import kotlinx.coroutines.flow.map import kotlinx.coroutines.flow.toList @@ -18,12 +18,12 @@ import org.springframework.transaction.annotation.Transactional @Component class ChainSyncRecordHandlerImpl( private val chainSyncRecordRepository: ChainSyncRecordRepository, - private val depositRepository: DepositRepository + private val chainSyncDepositRepository: ChainSyncDepositRepository ) : ChainSyncRecordHandler { override suspend fun loadLastSuccessRecord(chainName: String): ChainSyncRecord? { val chainSyncRecordDao = chainSyncRecordRepository.findByChain(chainName).awaitSingleOrNull() return if (chainSyncRecordDao !== null) { - val deposits = depositRepository.findByChain(chainName).map { + val deposits = chainSyncDepositRepository.findByChain(chainName).map { Deposit(it.depositor, it.depositorMemo, it.amount, it.chain, it.token, it.tokenAddress) } ChainSyncRecord( @@ -53,7 +53,7 @@ class ChainSyncRecordHandlerImpl( ) chainSyncRecordRepository.save(chainSyncRecordDao).awaitFirst() val depositsDao = syncRecord.records.map { - DepositModel( + ChainSyncDepositModel( null, it.depositor, it.depositorMemo, @@ -63,6 +63,6 @@ class ChainSyncRecordHandlerImpl( it.tokenAddress ) } - depositRepository.saveAll(depositsDao).awaitFirst() + chainSyncDepositRepository.saveAll(depositsDao).awaitFirst() } } diff --git a/BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/impl/WalletSyncRecordHandlerImpl.kt b/BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/impl/WalletSyncRecordHandlerImpl.kt index 3addb0f6f..69e884da3 100644 --- a/BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/impl/WalletSyncRecordHandlerImpl.kt +++ b/BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/impl/WalletSyncRecordHandlerImpl.kt @@ -2,20 +2,20 @@ package co.nilin.opex.port.bcgateway.postgres.impl import co.nilin.opex.bcgateway.core.model.Deposit import co.nilin.opex.bcgateway.core.spi.WalletSyncRecordHandler -import co.nilin.opex.port.bcgateway.postgres.dao.DepositRepository -import co.nilin.opex.port.bcgateway.postgres.model.DepositModel +import co.nilin.opex.port.bcgateway.postgres.dao.ChainSyncDepositRepository +import co.nilin.opex.port.bcgateway.postgres.model.ChainSyncDepositModel import kotlinx.coroutines.reactive.awaitFirst import org.springframework.stereotype.Component import org.springframework.transaction.annotation.Transactional @Component class WalletSyncRecordHandlerImpl( - private val depositRepository: DepositRepository + private val chainSyncDepositRepository: ChainSyncDepositRepository ) : WalletSyncRecordHandler { @Transactional override suspend fun saveReadyToSyncTransfers(chainName: String, deposits: List) { val depositsDao = deposits.map { - DepositModel( + ChainSyncDepositModel( null, it.depositor, it.depositorMemo, @@ -25,10 +25,10 @@ class WalletSyncRecordHandlerImpl( it.tokenAddress ) } - depositRepository.saveAll(depositsDao).awaitFirst() + chainSyncDepositRepository.saveAll(depositsDao).awaitFirst() } override suspend fun findReadyToSyncTransfers(count: Long?): List { TODO("Not yet implemented") } -} \ No newline at end of file +} diff --git a/BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/model/DepositModel.kt b/BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/model/ChainSyncDepositModel.kt similarity index 86% rename from BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/model/DepositModel.kt rename to BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/model/ChainSyncDepositModel.kt index d79321a48..3befbd9da 100644 --- a/BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/model/DepositModel.kt +++ b/BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/model/ChainSyncDepositModel.kt @@ -5,8 +5,8 @@ import org.springframework.data.relational.core.mapping.Column import org.springframework.data.relational.core.mapping.Table import java.math.BigDecimal -@Table("deposits") -data class DepositModel( +@Table("chain_sync_deposits") +data class ChainSyncDepositModel( @Id val id: Long?, val depositor: String, val depositorMemo: String?, @@ -14,4 +14,4 @@ data class DepositModel( val chain: String?, val token: Boolean, val tokenAddress: String? -) \ No newline at end of file +) diff --git a/BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/model/WalletSyncDepositModel.kt b/BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/model/WalletSyncDepositModel.kt new file mode 100644 index 000000000..059aae74e --- /dev/null +++ b/BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/model/WalletSyncDepositModel.kt @@ -0,0 +1,17 @@ +package co.nilin.opex.port.bcgateway.postgres.model + +import org.springframework.data.annotation.Id +import org.springframework.data.relational.core.mapping.Table +import java.math.BigDecimal + +@Table("wallet_sync_deposits") +data class WalletSyncDepositModel( + @Id val id: Long?, + val walletSyncRecord: Long, + val depositor: String, + val depositorMemo: String?, + val amount: BigDecimal, + val chain: String?, + val token: Boolean, + val tokenAddress: String? +) diff --git a/BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/model/WalletSyncModel.kt b/BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/model/WalletSyncModel.kt index 9d97bc548..7ca748374 100644 --- a/BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/model/WalletSyncModel.kt +++ b/BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/model/WalletSyncModel.kt @@ -1,6 +1,7 @@ package co.nilin.opex.port.bcgateway.postgres.model import org.springframework.data.annotation.Id +import org.springframework.data.relational.core.mapping.Column import org.springframework.data.relational.core.mapping.Table import java.time.LocalDateTime @@ -8,3 +9,14 @@ import java.time.LocalDateTime data class WalletSyncScheduleModel( @Id val id: Long?, val retryTime: LocalDateTime, val delay: Long, val batchSize: Long? ) + +@Table("wallet_sync_records") +data class WalletSyncRecordModel( + @Id val id: Long?, + val chain: String, + val time: LocalDateTime, + val endpointUrl: String, + val latestBlock: Long?, + val success: Boolean, + val error: String? +) From 9d6f5e290e8a3263e1fb901da5957be86b4c02df Mon Sep 17 00:00:00 2001 From: metalicn20 Date: Tue, 14 Sep 2021 13:39:57 +0430 Subject: [PATCH 17/24] Make chain a required property in deposit --- .../kotlin/co/nilin/opex/bcgateway/core/model/Deposit.kt | 2 +- .../opex/port/bcgateway/postgres/config/PostgresConfig.kt | 6 +++--- .../port/bcgateway/postgres/model/ChainSyncDepositModel.kt | 2 +- .../port/bcgateway/postgres/model/WalletSyncDepositModel.kt | 2 +- 4 files changed, 6 insertions(+), 6 deletions(-) diff --git a/BlockchainGateway/bc-gateway-core/src/main/kotlin/co/nilin/opex/bcgateway/core/model/Deposit.kt b/BlockchainGateway/bc-gateway-core/src/main/kotlin/co/nilin/opex/bcgateway/core/model/Deposit.kt index 9cdaee869..f2263ff72 100644 --- a/BlockchainGateway/bc-gateway-core/src/main/kotlin/co/nilin/opex/bcgateway/core/model/Deposit.kt +++ b/BlockchainGateway/bc-gateway-core/src/main/kotlin/co/nilin/opex/bcgateway/core/model/Deposit.kt @@ -6,7 +6,7 @@ data class Deposit( val depositor: String, val depositorMemo: String?, val amount: BigDecimal, - val chain: String?, + val chain: String, val token: Boolean, val tokenAddress: String? ) diff --git a/BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/config/PostgresConfig.kt b/BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/config/PostgresConfig.kt index 057f1707e..076e20230 100644 --- a/BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/config/PostgresConfig.kt +++ b/BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/config/PostgresConfig.kt @@ -81,7 +81,7 @@ class PostgresConfig(db: DatabaseClient) { ); CREATE TABLE IF NOT EXISTS chain_sync_deposits ( id SERIAL PRIMARY KEY, - chain VARCHAR(72), + chain VARCHAR(72) NOT NULL, token BOOLEAN NOT NULL, token_address VARCHAR(72), amount NUMERIC NOT NULL, @@ -91,7 +91,7 @@ class PostgresConfig(db: DatabaseClient) { CREATE TABLE IF NOT EXISTS wallet_sync_deposits ( id SERIAL PRIMARY KEY, wallet_sync_record INTEGER NOT NULL, - chain VARCHAR(72), + chain VARCHAR(72) NOT NULL, token BOOLEAN NOT NULL, token_address VARCHAR(72), amount NUMERIC NOT NULL, @@ -106,7 +106,7 @@ class PostgresConfig(db: DatabaseClient) { ); CREATE TABLE IF NOT EXISTS wallet_sync_records ( id SERIAL PRIMARY KEY, - chain VARCHAR(72), + chain VARCHAR(72) NOT NULL, time TIMESTAMP NOT NULL, endpoint_url VARCHAR(72) NOT NULL, latest_block INTEGER, diff --git a/BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/model/ChainSyncDepositModel.kt b/BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/model/ChainSyncDepositModel.kt index 3befbd9da..b36e36196 100644 --- a/BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/model/ChainSyncDepositModel.kt +++ b/BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/model/ChainSyncDepositModel.kt @@ -11,7 +11,7 @@ data class ChainSyncDepositModel( val depositor: String, val depositorMemo: String?, val amount: BigDecimal, - val chain: String?, + val chain: String, val token: Boolean, val tokenAddress: String? ) diff --git a/BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/model/WalletSyncDepositModel.kt b/BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/model/WalletSyncDepositModel.kt index 059aae74e..bde6b818c 100644 --- a/BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/model/WalletSyncDepositModel.kt +++ b/BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/model/WalletSyncDepositModel.kt @@ -11,7 +11,7 @@ data class WalletSyncDepositModel( val depositor: String, val depositorMemo: String?, val amount: BigDecimal, - val chain: String?, + val chain: String, val token: Boolean, val tokenAddress: String? ) From a63193b170ac154380e8d41115e79109c73e89e3 Mon Sep 17 00:00:00 2001 From: metalicn20 Date: Tue, 14 Sep 2021 13:42:20 +0430 Subject: [PATCH 18/24] Fix wallet sync record model --- .../postgres/config/PostgresConfig.kt | 45 +++++++++---------- .../postgres/model/WalletSyncModel.kt | 3 -- 2 files changed, 21 insertions(+), 27 deletions(-) diff --git a/BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/config/PostgresConfig.kt b/BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/config/PostgresConfig.kt index 076e20230..bfa28b593 100644 --- a/BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/config/PostgresConfig.kt +++ b/BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/config/PostgresConfig.kt @@ -65,19 +65,17 @@ class PostgresConfig(db: DatabaseClient) { success BOOLEAN NOT NULL, error VARCHAR(100) ); - CREATE TABLE IF NOT EXISTS currency ( - symbol VARCHAR(72) PRIMARY KEY, - name VARCHAR(72) NOT NULL + CREATE TABLE IF NOT EXISTS wallet_sync_schedules ( + id SERIAL PRIMARY KEY, + retry_time TIMESTAMP NOT NULL, + delay INTEGER NOT NULL, + batch_size INTEGER ); - CREATE TABLE IF NOT EXISTS currency_implementations ( - symbol VARCHAR(72) PRIMARY KEY, - chain VARCHAR(72) NOT NULL, - token BOOLEAN NOT NULL, - token_address VARCHAR(72), - token_name VARCHAR(72), - withdraw_enabled BOOLEAN NOT NULL, - withdraw_fee NUMERIC NOT NULL, - withdraw_min NUMERIC NOT NULL + CREATE TABLE IF NOT EXISTS wallet_sync_records ( + id SERIAL PRIMARY KEY, + time TIMESTAMP NOT NULL, + success BOOLEAN NOT NULL, + error VARCHAR(100) ); CREATE TABLE IF NOT EXISTS chain_sync_deposits ( id SERIAL PRIMARY KEY, @@ -98,20 +96,19 @@ class PostgresConfig(db: DatabaseClient) { depositor VARCHAR(72) NOT NULL, depositorMemo VARCHAR(72) ); - CREATE TABLE IF NOT EXISTS wallet_sync_schedules ( - id SERIAL PRIMARY KEY, - retry_time TIMESTAMP NOT NULL, - delay INTEGER NOT NULL, - batch_size INTEGER + CREATE TABLE IF NOT EXISTS currency ( + symbol VARCHAR(72) PRIMARY KEY, + name VARCHAR(72) NOT NULL ); - CREATE TABLE IF NOT EXISTS wallet_sync_records ( - id SERIAL PRIMARY KEY, + CREATE TABLE IF NOT EXISTS currency_implementations ( + symbol VARCHAR(72) PRIMARY KEY, chain VARCHAR(72) NOT NULL, - time TIMESTAMP NOT NULL, - endpoint_url VARCHAR(72) NOT NULL, - latest_block INTEGER, - success BOOLEAN NOT NULL, - error VARCHAR(100) + token BOOLEAN NOT NULL, + token_address VARCHAR(72), + token_name VARCHAR(72), + withdraw_enabled BOOLEAN NOT NULL, + withdraw_fee NUMERIC NOT NULL, + withdraw_min NUMERIC NOT NULL ); """ } diff --git a/BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/model/WalletSyncModel.kt b/BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/model/WalletSyncModel.kt index 7ca748374..3947af2b7 100644 --- a/BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/model/WalletSyncModel.kt +++ b/BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/model/WalletSyncModel.kt @@ -13,10 +13,7 @@ data class WalletSyncScheduleModel( @Table("wallet_sync_records") data class WalletSyncRecordModel( @Id val id: Long?, - val chain: String, val time: LocalDateTime, - val endpointUrl: String, - val latestBlock: Long?, val success: Boolean, val error: String? ) From fc6c2f4b113fc31c9f60610d0a9f97e11f5ec6fc Mon Sep 17 00:00:00 2001 From: metalicn20 Date: Tue, 14 Sep 2021 13:44:47 +0430 Subject: [PATCH 19/24] Add wallet sync repositories --- .../postgres/dao/WalletSyncDepositRepository.kt | 10 ++++++++++ .../postgres/dao/WalletSyncRecordRepository.kt | 9 +++++++++ 2 files changed, 19 insertions(+) create mode 100644 BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/dao/WalletSyncDepositRepository.kt create mode 100644 BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/dao/WalletSyncRecordRepository.kt diff --git a/BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/dao/WalletSyncDepositRepository.kt b/BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/dao/WalletSyncDepositRepository.kt new file mode 100644 index 000000000..ab7f8cb72 --- /dev/null +++ b/BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/dao/WalletSyncDepositRepository.kt @@ -0,0 +1,10 @@ +package co.nilin.opex.port.bcgateway.postgres.dao + +import co.nilin.opex.port.bcgateway.postgres.model.ChainSyncDepositModel +import co.nilin.opex.port.bcgateway.postgres.model.WalletSyncDepositModel +import kotlinx.coroutines.flow.Flow +import org.springframework.data.repository.reactive.ReactiveCrudRepository +import org.springframework.stereotype.Repository + +@Repository +interface WalletSyncDepositRepository : ReactiveCrudRepository diff --git a/BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/dao/WalletSyncRecordRepository.kt b/BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/dao/WalletSyncRecordRepository.kt new file mode 100644 index 000000000..031c4f98c --- /dev/null +++ b/BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/dao/WalletSyncRecordRepository.kt @@ -0,0 +1,9 @@ +package co.nilin.opex.port.bcgateway.postgres.dao + +import co.nilin.opex.port.bcgateway.postgres.model.WalletSyncRecordModel +import co.nilin.opex.port.bcgateway.postgres.model.WalletSyncScheduleModel +import org.springframework.data.repository.reactive.ReactiveCrudRepository +import org.springframework.stereotype.Repository + +@Repository +interface WalletSyncRecordRepository : ReactiveCrudRepository From 7c8d0ec2705bdeccbe09f1eeca50b1f094e25757 Mon Sep 17 00:00:00 2001 From: metalicn20 Date: Tue, 14 Sep 2021 14:12:43 +0430 Subject: [PATCH 20/24] Add single row constraint to wallet sync schedules --- .../nilin/opex/port/bcgateway/postgres/config/PostgresConfig.kt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/config/PostgresConfig.kt b/BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/config/PostgresConfig.kt index bfa28b593..a79b095ad 100644 --- a/BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/config/PostgresConfig.kt +++ b/BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/config/PostgresConfig.kt @@ -66,7 +66,7 @@ class PostgresConfig(db: DatabaseClient) { error VARCHAR(100) ); CREATE TABLE IF NOT EXISTS wallet_sync_schedules ( - id SERIAL PRIMARY KEY, + id INTEGER PRIMARY KEY DEFAULT(1) CHECK(id = 1), retry_time TIMESTAMP NOT NULL, delay INTEGER NOT NULL, batch_size INTEGER From 83d50e927c9f7837aea0a5041f0d300322c72345 Mon Sep 17 00:00:00 2001 From: metalicn20 Date: Tue, 14 Sep 2021 14:36:41 +0430 Subject: [PATCH 21/24] Implement wallet sync scheduler handler --- .../postgres/dao/WalletSyncScheduleRepository.kt | 8 +++++++- .../impl/WalletSyncSchedulerHandlerImpl.kt | 15 +++++++++++---- 2 files changed, 18 insertions(+), 5 deletions(-) diff --git a/BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/dao/WalletSyncScheduleRepository.kt b/BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/dao/WalletSyncScheduleRepository.kt index 9c7e282d5..12913c8c7 100644 --- a/BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/dao/WalletSyncScheduleRepository.kt +++ b/BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/dao/WalletSyncScheduleRepository.kt @@ -1,8 +1,14 @@ package co.nilin.opex.port.bcgateway.postgres.dao import co.nilin.opex.port.bcgateway.postgres.model.WalletSyncScheduleModel +import org.springframework.data.r2dbc.repository.Query import org.springframework.data.repository.reactive.ReactiveCrudRepository import org.springframework.stereotype.Repository +import reactor.core.publisher.Mono +import java.time.LocalDateTime @Repository -interface WalletSyncScheduleRepository : ReactiveCrudRepository \ No newline at end of file +interface WalletSyncScheduleRepository : ReactiveCrudRepository { + @Query("select * from wallet_sync_schedules where retryTime <= :time") + fun findActiveSchedule(time: LocalDateTime): Mono +} diff --git a/BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/impl/WalletSyncSchedulerHandlerImpl.kt b/BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/impl/WalletSyncSchedulerHandlerImpl.kt index d4ca29623..ec3fa6206 100644 --- a/BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/impl/WalletSyncSchedulerHandlerImpl.kt +++ b/BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/impl/WalletSyncSchedulerHandlerImpl.kt @@ -2,16 +2,23 @@ package co.nilin.opex.port.bcgateway.postgres.impl import co.nilin.opex.bcgateway.core.model.WalletSyncSchedule import co.nilin.opex.bcgateway.core.spi.WalletSyncSchedulerHandler +import co.nilin.opex.port.bcgateway.postgres.dao.WalletSyncScheduleRepository +import co.nilin.opex.port.bcgateway.postgres.model.WalletSyncScheduleModel +import kotlinx.coroutines.reactive.awaitFirst +import kotlinx.coroutines.reactive.awaitSingleOrNull import org.springframework.stereotype.Component import java.time.LocalDateTime @Component -class WalletSyncSchedulerHandlerImpl: WalletSyncSchedulerHandler { +class WalletSyncSchedulerHandlerImpl(private val walletSyncScheduleRepository: WalletSyncScheduleRepository) : + WalletSyncSchedulerHandler { override suspend fun fetchActiveSchedule(time: LocalDateTime): WalletSyncSchedule? { - TODO("Not yet implemented") + val dao = walletSyncScheduleRepository.findActiveSchedule(time).awaitSingleOrNull() + return if (dao !== null) WalletSyncSchedule(dao.retryTime, dao.delay, dao.batchSize) else null } override suspend fun prepareScheduleForNextTry(syncSchedule: WalletSyncSchedule, time: LocalDateTime) { - TODO("Not yet implemented") + val dao = WalletSyncScheduleModel(1, time, syncSchedule.delay, syncSchedule.batchSize) + walletSyncScheduleRepository.save(dao).awaitFirst() } -} \ No newline at end of file +} From 737b3145a02dab9a4757d059daaadf1f86d12306 Mon Sep 17 00:00:00 2001 From: metalicn20 Date: Tue, 14 Sep 2021 18:45:25 +0430 Subject: [PATCH 22/24] Implement deposit sync state --- .../opex/bcgateway/core/model/Deposit.kt | 1 + .../opex/bcgateway/core/model/WalletSync.kt | 2 +- .../core/service/WalletSyncServiceImpl.kt | 11 +++++- .../core/spi/WalletSyncRecordHandler.kt | 4 ++- .../postgres/config/PostgresConfig.kt | 13 ++----- .../dao/ChainSyncDepositRepository.kt | 11 ------ .../postgres/dao/DepositRepository.kt | 24 +++++++++++++ .../dao/WalletSyncDepositRepository.kt | 10 ------ .../dao/WalletSyncRecordRepository.kt | 1 - .../impl/ChainSyncRecordHandlerImpl.kt | 21 +++--------- .../impl/WalletSyncRecordHandlerImpl.kt | 34 +++++++++++++++---- ...ainSyncDepositModel.kt => DepositModel.kt} | 6 ++-- 12 files changed, 76 insertions(+), 62 deletions(-) delete mode 100644 BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/dao/ChainSyncDepositRepository.kt create mode 100644 BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/dao/DepositRepository.kt delete mode 100644 BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/dao/WalletSyncDepositRepository.kt rename BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/model/{ChainSyncDepositModel.kt => DepositModel.kt} (74%) diff --git a/BlockchainGateway/bc-gateway-core/src/main/kotlin/co/nilin/opex/bcgateway/core/model/Deposit.kt b/BlockchainGateway/bc-gateway-core/src/main/kotlin/co/nilin/opex/bcgateway/core/model/Deposit.kt index f2263ff72..d355d6df3 100644 --- a/BlockchainGateway/bc-gateway-core/src/main/kotlin/co/nilin/opex/bcgateway/core/model/Deposit.kt +++ b/BlockchainGateway/bc-gateway-core/src/main/kotlin/co/nilin/opex/bcgateway/core/model/Deposit.kt @@ -3,6 +3,7 @@ package co.nilin.opex.bcgateway.core.model import java.math.BigDecimal data class Deposit( + val id: Long?, val depositor: String, val depositorMemo: String?, val amount: BigDecimal, diff --git a/BlockchainGateway/bc-gateway-core/src/main/kotlin/co/nilin/opex/bcgateway/core/model/WalletSync.kt b/BlockchainGateway/bc-gateway-core/src/main/kotlin/co/nilin/opex/bcgateway/core/model/WalletSync.kt index fd4eb056c..439e5e053 100644 --- a/BlockchainGateway/bc-gateway-core/src/main/kotlin/co/nilin/opex/bcgateway/core/model/WalletSync.kt +++ b/BlockchainGateway/bc-gateway-core/src/main/kotlin/co/nilin/opex/bcgateway/core/model/WalletSync.kt @@ -4,5 +4,5 @@ import java.time.LocalDateTime data class WalletSyncSchedule(val retryTime: LocalDateTime, val delay: Long, val batchSize: Long?) data class WalletSyncRecord( - val time: LocalDateTime, val success: Boolean, val error: String?, val deposit: List + val time: LocalDateTime, val success: Boolean, val error: String?, val deposit: Deposit ) diff --git a/BlockchainGateway/bc-gateway-core/src/main/kotlin/co/nilin/opex/bcgateway/core/service/WalletSyncServiceImpl.kt b/BlockchainGateway/bc-gateway-core/src/main/kotlin/co/nilin/opex/bcgateway/core/service/WalletSyncServiceImpl.kt index 77228ea44..977202000 100644 --- a/BlockchainGateway/bc-gateway-core/src/main/kotlin/co/nilin/opex/bcgateway/core/service/WalletSyncServiceImpl.kt +++ b/BlockchainGateway/bc-gateway-core/src/main/kotlin/co/nilin/opex/bcgateway/core/service/WalletSyncServiceImpl.kt @@ -1,6 +1,7 @@ package co.nilin.opex.bcgateway.core.service import co.nilin.opex.bcgateway.core.api.WalletSyncService +import co.nilin.opex.bcgateway.core.model.WalletSyncRecord import co.nilin.opex.bcgateway.core.spi.* import kotlinx.coroutines.ExecutorCoroutineDispatcher import kotlinx.coroutines.async @@ -27,8 +28,16 @@ class WalletSyncServiceImpl( async(dispatcher) { val uuid = assignedAddressHandler.findUuid(deposit.depositor, deposit.depositorMemo) if (uuid != null) { - val symbol = currencyLoader.findSymbol(deposit.chain!!, deposit.tokenAddress) + val symbol = currencyLoader.findSymbol(deposit.chain, deposit.tokenAddress) if (symbol != null) walletProxy.transfer(uuid, symbol, deposit.amount) + walletSyncRecordHandler.saveWalletSyncRecord( + WalletSyncRecord( + LocalDateTime.now(), + true, + null, + deposit + ) + ) } } } diff --git a/BlockchainGateway/bc-gateway-core/src/main/kotlin/co/nilin/opex/bcgateway/core/spi/WalletSyncRecordHandler.kt b/BlockchainGateway/bc-gateway-core/src/main/kotlin/co/nilin/opex/bcgateway/core/spi/WalletSyncRecordHandler.kt index 18437efe9..90979ab28 100644 --- a/BlockchainGateway/bc-gateway-core/src/main/kotlin/co/nilin/opex/bcgateway/core/spi/WalletSyncRecordHandler.kt +++ b/BlockchainGateway/bc-gateway-core/src/main/kotlin/co/nilin/opex/bcgateway/core/spi/WalletSyncRecordHandler.kt @@ -1,8 +1,10 @@ package co.nilin.opex.bcgateway.core.spi import co.nilin.opex.bcgateway.core.model.Deposit +import co.nilin.opex.bcgateway.core.model.WalletSyncRecord interface WalletSyncRecordHandler { suspend fun saveReadyToSyncTransfers(chainName: String, deposits: List) + suspend fun saveWalletSyncRecord(syncRecord: WalletSyncRecord) suspend fun findReadyToSyncTransfers(count: Long?): List -} \ No newline at end of file +} diff --git a/BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/config/PostgresConfig.kt b/BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/config/PostgresConfig.kt index a79b095ad..df4ca430b 100644 --- a/BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/config/PostgresConfig.kt +++ b/BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/config/PostgresConfig.kt @@ -77,16 +77,7 @@ class PostgresConfig(db: DatabaseClient) { success BOOLEAN NOT NULL, error VARCHAR(100) ); - CREATE TABLE IF NOT EXISTS chain_sync_deposits ( - id SERIAL PRIMARY KEY, - chain VARCHAR(72) NOT NULL, - token BOOLEAN NOT NULL, - token_address VARCHAR(72), - amount NUMERIC NOT NULL, - depositor VARCHAR(72) NOT NULL, - depositorMemo VARCHAR(72) - ); - CREATE TABLE IF NOT EXISTS wallet_sync_deposits ( + CREATE TABLE IF NOT EXISTS deposits ( id SERIAL PRIMARY KEY, wallet_sync_record INTEGER NOT NULL, chain VARCHAR(72) NOT NULL, @@ -94,7 +85,7 @@ class PostgresConfig(db: DatabaseClient) { token_address VARCHAR(72), amount NUMERIC NOT NULL, depositor VARCHAR(72) NOT NULL, - depositorMemo VARCHAR(72) + depositor_memo VARCHAR(72) ); CREATE TABLE IF NOT EXISTS currency ( symbol VARCHAR(72) PRIMARY KEY, diff --git a/BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/dao/ChainSyncDepositRepository.kt b/BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/dao/ChainSyncDepositRepository.kt deleted file mode 100644 index 04d5e1237..000000000 --- a/BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/dao/ChainSyncDepositRepository.kt +++ /dev/null @@ -1,11 +0,0 @@ -package co.nilin.opex.port.bcgateway.postgres.dao - -import co.nilin.opex.port.bcgateway.postgres.model.ChainSyncDepositModel -import kotlinx.coroutines.flow.Flow -import org.springframework.data.repository.reactive.ReactiveCrudRepository -import org.springframework.stereotype.Repository - -@Repository -interface ChainSyncDepositRepository : ReactiveCrudRepository { - fun findByChain(chain: String): Flow -} diff --git a/BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/dao/DepositRepository.kt b/BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/dao/DepositRepository.kt new file mode 100644 index 000000000..848d4d364 --- /dev/null +++ b/BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/dao/DepositRepository.kt @@ -0,0 +1,24 @@ +package co.nilin.opex.port.bcgateway.postgres.dao + +import co.nilin.opex.port.bcgateway.postgres.model.DepositModel +import kotlinx.coroutines.flow.Flow +import org.springframework.data.r2dbc.repository.Modifying +import org.springframework.data.r2dbc.repository.Query +import org.springframework.data.repository.reactive.ReactiveCrudRepository +import org.springframework.stereotype.Repository +import reactor.core.publisher.Mono + +@Repository +interface DepositRepository : ReactiveCrudRepository { + fun findByChain(chain: String): Flow + + @Query("select * from deposits where chain = :chain and wallet_sync_record is null") + fun findByChainWhereNotSynced(chain: String): Flow + + @Query("select * from deposits where wallet_record_id is null limit :count") + fun findLimited(count: Long?): Flow + + @Modifying + @Query("update deposits set wallet_sync_record = :walletSyncRecord where id = :id") + fun updateWalletSyncRecord(id: Long, walletSyncRecord: Long): Mono +} diff --git a/BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/dao/WalletSyncDepositRepository.kt b/BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/dao/WalletSyncDepositRepository.kt deleted file mode 100644 index ab7f8cb72..000000000 --- a/BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/dao/WalletSyncDepositRepository.kt +++ /dev/null @@ -1,10 +0,0 @@ -package co.nilin.opex.port.bcgateway.postgres.dao - -import co.nilin.opex.port.bcgateway.postgres.model.ChainSyncDepositModel -import co.nilin.opex.port.bcgateway.postgres.model.WalletSyncDepositModel -import kotlinx.coroutines.flow.Flow -import org.springframework.data.repository.reactive.ReactiveCrudRepository -import org.springframework.stereotype.Repository - -@Repository -interface WalletSyncDepositRepository : ReactiveCrudRepository diff --git a/BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/dao/WalletSyncRecordRepository.kt b/BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/dao/WalletSyncRecordRepository.kt index 031c4f98c..cab986087 100644 --- a/BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/dao/WalletSyncRecordRepository.kt +++ b/BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/dao/WalletSyncRecordRepository.kt @@ -1,7 +1,6 @@ package co.nilin.opex.port.bcgateway.postgres.dao import co.nilin.opex.port.bcgateway.postgres.model.WalletSyncRecordModel -import co.nilin.opex.port.bcgateway.postgres.model.WalletSyncScheduleModel import org.springframework.data.repository.reactive.ReactiveCrudRepository import org.springframework.stereotype.Repository diff --git a/BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/impl/ChainSyncRecordHandlerImpl.kt b/BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/impl/ChainSyncRecordHandlerImpl.kt index 37bfb4571..2a2cf7799 100644 --- a/BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/impl/ChainSyncRecordHandlerImpl.kt +++ b/BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/impl/ChainSyncRecordHandlerImpl.kt @@ -5,8 +5,7 @@ import co.nilin.opex.bcgateway.core.model.Deposit import co.nilin.opex.bcgateway.core.model.Endpoint import co.nilin.opex.bcgateway.core.spi.ChainSyncRecordHandler import co.nilin.opex.port.bcgateway.postgres.dao.ChainSyncRecordRepository -import co.nilin.opex.port.bcgateway.postgres.dao.ChainSyncDepositRepository -import co.nilin.opex.port.bcgateway.postgres.model.ChainSyncDepositModel +import co.nilin.opex.port.bcgateway.postgres.dao.DepositRepository import co.nilin.opex.port.bcgateway.postgres.model.ChainSyncRecordModel import kotlinx.coroutines.flow.map import kotlinx.coroutines.flow.toList @@ -18,13 +17,13 @@ import org.springframework.transaction.annotation.Transactional @Component class ChainSyncRecordHandlerImpl( private val chainSyncRecordRepository: ChainSyncRecordRepository, - private val chainSyncDepositRepository: ChainSyncDepositRepository + private val depositRepository: DepositRepository ) : ChainSyncRecordHandler { override suspend fun loadLastSuccessRecord(chainName: String): ChainSyncRecord? { val chainSyncRecordDao = chainSyncRecordRepository.findByChain(chainName).awaitSingleOrNull() return if (chainSyncRecordDao !== null) { - val deposits = chainSyncDepositRepository.findByChain(chainName).map { - Deposit(it.depositor, it.depositorMemo, it.amount, it.chain, it.token, it.tokenAddress) + val deposits = depositRepository.findByChainWhereNotSynced(chainName).map { + Deposit(it.id, it.depositor, it.depositorMemo, it.amount, it.chain, it.token, it.tokenAddress) } ChainSyncRecord( chainSyncRecordDao.chain, @@ -52,17 +51,5 @@ class ChainSyncRecordHandlerImpl( syncRecord.error ) chainSyncRecordRepository.save(chainSyncRecordDao).awaitFirst() - val depositsDao = syncRecord.records.map { - ChainSyncDepositModel( - null, - it.depositor, - it.depositorMemo, - it.amount, - it.chain, - it.token, - it.tokenAddress - ) - } - chainSyncDepositRepository.saveAll(depositsDao).awaitFirst() } } diff --git a/BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/impl/WalletSyncRecordHandlerImpl.kt b/BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/impl/WalletSyncRecordHandlerImpl.kt index 69e884da3..bcbc78854 100644 --- a/BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/impl/WalletSyncRecordHandlerImpl.kt +++ b/BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/impl/WalletSyncRecordHandlerImpl.kt @@ -1,21 +1,28 @@ package co.nilin.opex.port.bcgateway.postgres.impl import co.nilin.opex.bcgateway.core.model.Deposit +import co.nilin.opex.bcgateway.core.model.WalletSyncRecord import co.nilin.opex.bcgateway.core.spi.WalletSyncRecordHandler -import co.nilin.opex.port.bcgateway.postgres.dao.ChainSyncDepositRepository -import co.nilin.opex.port.bcgateway.postgres.model.ChainSyncDepositModel +import co.nilin.opex.port.bcgateway.postgres.dao.DepositRepository +import co.nilin.opex.port.bcgateway.postgres.dao.WalletSyncRecordRepository +import co.nilin.opex.port.bcgateway.postgres.model.DepositModel +import co.nilin.opex.port.bcgateway.postgres.model.WalletSyncRecordModel +import kotlinx.coroutines.flow.map +import kotlinx.coroutines.flow.toList import kotlinx.coroutines.reactive.awaitFirst import org.springframework.stereotype.Component import org.springframework.transaction.annotation.Transactional @Component class WalletSyncRecordHandlerImpl( - private val chainSyncDepositRepository: ChainSyncDepositRepository + private val walletSyncRecordRepository: WalletSyncRecordRepository, + private val depositRepository: DepositRepository ) : WalletSyncRecordHandler { @Transactional override suspend fun saveReadyToSyncTransfers(chainName: String, deposits: List) { val depositsDao = deposits.map { - ChainSyncDepositModel( + DepositModel( + it.id, null, it.depositor, it.depositorMemo, @@ -25,10 +32,25 @@ class WalletSyncRecordHandlerImpl( it.tokenAddress ) } - chainSyncDepositRepository.saveAll(depositsDao).awaitFirst() + depositRepository.saveAll(depositsDao).awaitFirst() + } + + @Transactional + override suspend fun saveWalletSyncRecord(syncRecord: WalletSyncRecord) { + val dao = walletSyncRecordRepository.save( + WalletSyncRecordModel( + null, + syncRecord.time, + syncRecord.success, + syncRecord.error + ) + ).awaitFirst() + depositRepository.updateWalletSyncRecord(syncRecord.deposit.id!!, dao.id!!).awaitFirst() } override suspend fun findReadyToSyncTransfers(count: Long?): List { - TODO("Not yet implemented") + return depositRepository.findLimited(count).map { + Deposit(it.id, it.depositor, it.depositorMemo, it.amount, it.chain, it.token, it.tokenAddress) + }.toList() } } diff --git a/BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/model/ChainSyncDepositModel.kt b/BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/model/DepositModel.kt similarity index 74% rename from BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/model/ChainSyncDepositModel.kt rename to BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/model/DepositModel.kt index b36e36196..617eec478 100644 --- a/BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/model/ChainSyncDepositModel.kt +++ b/BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/model/DepositModel.kt @@ -1,13 +1,13 @@ package co.nilin.opex.port.bcgateway.postgres.model import org.springframework.data.annotation.Id -import org.springframework.data.relational.core.mapping.Column import org.springframework.data.relational.core.mapping.Table import java.math.BigDecimal -@Table("chain_sync_deposits") -data class ChainSyncDepositModel( +@Table("deposits") +data class DepositModel( @Id val id: Long?, + val walletSyncRecord: Long?, val depositor: String, val depositorMemo: String?, val amount: BigDecimal, From c068fc7a3ef12d4bddad532ac36d6b0d55ff9668 Mon Sep 17 00:00:00 2001 From: metalicn20 Date: Tue, 14 Sep 2021 19:17:05 +0430 Subject: [PATCH 23/24] Remove wallet sync deposit model --- .../dao/WalletSyncScheduleRepository.kt | 2 +- .../postgres/model/WalletSyncDepositModel.kt | 17 ----------------- 2 files changed, 1 insertion(+), 18 deletions(-) delete mode 100644 BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/model/WalletSyncDepositModel.kt diff --git a/BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/dao/WalletSyncScheduleRepository.kt b/BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/dao/WalletSyncScheduleRepository.kt index 12913c8c7..176ff6a23 100644 --- a/BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/dao/WalletSyncScheduleRepository.kt +++ b/BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/dao/WalletSyncScheduleRepository.kt @@ -9,6 +9,6 @@ import java.time.LocalDateTime @Repository interface WalletSyncScheduleRepository : ReactiveCrudRepository { - @Query("select * from wallet_sync_schedules where retryTime <= :time") + @Query("select * from wallet_sync_schedules where retry_time <= :time") fun findActiveSchedule(time: LocalDateTime): Mono } diff --git a/BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/model/WalletSyncDepositModel.kt b/BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/model/WalletSyncDepositModel.kt deleted file mode 100644 index bde6b818c..000000000 --- a/BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/model/WalletSyncDepositModel.kt +++ /dev/null @@ -1,17 +0,0 @@ -package co.nilin.opex.port.bcgateway.postgres.model - -import org.springframework.data.annotation.Id -import org.springframework.data.relational.core.mapping.Table -import java.math.BigDecimal - -@Table("wallet_sync_deposits") -data class WalletSyncDepositModel( - @Id val id: Long?, - val walletSyncRecord: Long, - val depositor: String, - val depositorMemo: String?, - val amount: BigDecimal, - val chain: String, - val token: Boolean, - val tokenAddress: String? -) From c8c1b22ebb27c97803af8e53c0a0e016df44f235 Mon Sep 17 00:00:00 2001 From: metalicn20 Date: Tue, 14 Sep 2021 19:28:00 +0430 Subject: [PATCH 24/24] Remove chain sync scheduler handler --- .../dao/ChainSyncScheduleRepository.kt | 10 ++- .../impl/ChainSyncSchedulerHandlerImpl.kt | 18 ++++- .../postgres/impl/SyncRecordHandlerImpl.kt | 68 ------------------- 3 files changed, 24 insertions(+), 72 deletions(-) delete mode 100644 BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/impl/SyncRecordHandlerImpl.kt diff --git a/BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/dao/ChainSyncScheduleRepository.kt b/BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/dao/ChainSyncScheduleRepository.kt index 213ecb5b2..c4395e8d5 100644 --- a/BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/dao/ChainSyncScheduleRepository.kt +++ b/BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/dao/ChainSyncScheduleRepository.kt @@ -1,8 +1,16 @@ package co.nilin.opex.port.bcgateway.postgres.dao import co.nilin.opex.port.bcgateway.postgres.model.ChainSyncScheduleModel +import co.nilin.opex.port.bcgateway.postgres.model.WalletSyncScheduleModel +import kotlinx.coroutines.flow.Flow +import org.springframework.data.r2dbc.repository.Query import org.springframework.data.repository.reactive.ReactiveCrudRepository import org.springframework.stereotype.Repository +import reactor.core.publisher.Mono +import java.time.LocalDateTime @Repository -interface ChainSyncScheduleRepository : ReactiveCrudRepository +interface ChainSyncScheduleRepository : ReactiveCrudRepository { + @Query("select * from chain_sync_schedules where retry_time <= :time") + fun findActiveSchedule(time: LocalDateTime): Flow +} diff --git a/BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/impl/ChainSyncSchedulerHandlerImpl.kt b/BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/impl/ChainSyncSchedulerHandlerImpl.kt index c9a9f34a7..f33961d16 100644 --- a/BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/impl/ChainSyncSchedulerHandlerImpl.kt +++ b/BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/impl/ChainSyncSchedulerHandlerImpl.kt @@ -1,17 +1,29 @@ package co.nilin.opex.port.bcgateway.postgres.impl import co.nilin.opex.bcgateway.core.model.ChainSyncSchedule +import co.nilin.opex.bcgateway.core.model.WalletSyncSchedule import co.nilin.opex.bcgateway.core.spi.ChainSyncSchedulerHandler +import co.nilin.opex.port.bcgateway.postgres.dao.ChainSyncScheduleRepository +import co.nilin.opex.port.bcgateway.postgres.model.ChainSyncScheduleModel +import co.nilin.opex.port.bcgateway.postgres.model.WalletSyncScheduleModel +import kotlinx.coroutines.flow.map +import kotlinx.coroutines.flow.toList +import kotlinx.coroutines.reactive.awaitFirst +import kotlinx.coroutines.reactive.awaitSingleOrNull import org.springframework.stereotype.Component import java.time.LocalDateTime @Component -class ChainSyncSchedulerHandlerImpl: ChainSyncSchedulerHandler { +class ChainSyncSchedulerHandlerImpl(private val chainSyncScheduleRepository: ChainSyncScheduleRepository) : + ChainSyncSchedulerHandler { override suspend fun fetchActiveSchedules(time: LocalDateTime): List { - TODO("Not yet implemented") + return chainSyncScheduleRepository.findActiveSchedule(time).map { + ChainSyncSchedule(it.chain, it.retryTime, it.delay) + }.toList() } override suspend fun prepareScheduleForNextTry(syncSchedule: ChainSyncSchedule, time: LocalDateTime) { - TODO("Not yet implemented") + val dao = ChainSyncScheduleModel(syncSchedule.chainName, time, syncSchedule.delay) + chainSyncScheduleRepository.save(dao).awaitFirst() } } diff --git a/BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/impl/SyncRecordHandlerImpl.kt b/BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/impl/SyncRecordHandlerImpl.kt deleted file mode 100644 index 319b8e4c0..000000000 --- a/BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/impl/SyncRecordHandlerImpl.kt +++ /dev/null @@ -1,68 +0,0 @@ -package co.nilin.opex.port.bcgateway.postgres.impl - -import co.nilin.opex.bcgateway.core.model.ChainSyncRecord -import co.nilin.opex.bcgateway.core.model.Deposit -import co.nilin.opex.bcgateway.core.model.Endpoint -import co.nilin.opex.bcgateway.core.spi.SyncRecordHandler -import co.nilin.opex.port.bcgateway.postgres.dao.ChainSyncRecordRepository -import co.nilin.opex.port.bcgateway.postgres.dao.DepositRepository -import co.nilin.opex.port.bcgateway.postgres.model.DepositModel -import co.nilin.opex.port.bcgateway.postgres.model.SyncRecordModel -import kotlinx.coroutines.flow.map -import kotlinx.coroutines.flow.toList -import kotlinx.coroutines.reactive.awaitFirst -import kotlinx.coroutines.reactive.awaitSingleOrNull -import org.springframework.stereotype.Component -import org.springframework.transaction.annotation.Transactional - -@Component -class SyncRecordHandlerImpl( - private val chainSyncRecordRepository: ChainSyncRecordRepository, - private val depositRepository: DepositRepository -) : SyncRecordHandler { - override suspend fun loadLastSuccessRecord(chainName: String): ChainSyncRecord? { - val chainSyncRecordDao = chainSyncRecordRepository.findByChain(chainName).awaitSingleOrNull() - return if (chainSyncRecordDao !== null) { - val deposits = depositRepository.findByChain(chainName).map { - Deposit(it.depositor, it.depositorMemo, it.amount, it.chain, it.token, it.tokenAddress) - } - ChainSyncRecord( - chainSyncRecordDao.chain, - chainSyncRecordDao.time, - Endpoint(chainSyncRecordDao.endpointUrl), - chainSyncRecordDao.latestBlock, - chainSyncRecordDao.success, - chainSyncRecordDao.error, - deposits.toList() - ) - } else { - null - } - } - - @Transactional - override suspend fun saveSyncRecord(syncRecord: ChainSyncRecord) { - val chainSyncRecordDao = - SyncRecordModel( - syncRecord.chainName, - syncRecord.time, - syncRecord.endpoint.url, - syncRecord.latestBlock, - syncRecord.success, - syncRecord.error - ) - chainSyncRecordRepository.save(chainSyncRecordDao).awaitFirst() - val depositsDao = syncRecord.records.map { - DepositModel( - null, - it.depositor, - it.depositorMemo, - it.amount, - it.chain, - it.token, - it.tokenAddress - ) - } - depositRepository.saveAll(depositsDao).awaitFirst() - } -} \ No newline at end of file