diff --git a/BlockchainGateway/bc-gateway-app/src/main/kotlin/co/nilin/opex/bcgateway/app/config/AppConfig.kt b/BlockchainGateway/bc-gateway-app/src/main/kotlin/co/nilin/opex/bcgateway/app/config/AppConfig.kt index 58cc98ade..831ed2b07 100644 --- a/BlockchainGateway/bc-gateway-app/src/main/kotlin/co/nilin/opex/bcgateway/app/config/AppConfig.kt +++ b/BlockchainGateway/bc-gateway-app/src/main/kotlin/co/nilin/opex/bcgateway/app/config/AppConfig.kt @@ -28,17 +28,17 @@ class AppConfig { @Bean fun chainSyncService( - syncSchedulerHandler: SyncSchedulerHandler, + chainSyncSchedulerHandler: ChainSyncSchedulerHandler, chainEndpointProxyFinder: ChainEndpointProxyFinder, - syncRecordHandler: SyncRecordHandler, + chainSyncRecordHandler: ChainSyncRecordHandler, walletSyncRecordHandler: WalletSyncRecordHandler, currencyLoader: CurrencyLoader, operator: TransactionalOperator ): ChainSyncService { return ChainSyncServiceImpl( - syncSchedulerHandler, + chainSyncSchedulerHandler, chainEndpointProxyFinder, - syncRecordHandler, + chainSyncRecordHandler, walletSyncRecordHandler, currencyLoader, operator, @@ -61,4 +61,4 @@ class AppConfig { fun infoService(): InfoService { return InfoServiceImpl() } -} \ No newline at end of file +} diff --git a/BlockchainGateway/bc-gateway-core/src/main/kotlin/co/nilin/opex/bcgateway/core/model/Chain.kt b/BlockchainGateway/bc-gateway-core/src/main/kotlin/co/nilin/opex/bcgateway/core/model/Chain.kt index c591687f8..d88430445 100644 --- a/BlockchainGateway/bc-gateway-core/src/main/kotlin/co/nilin/opex/bcgateway/core/model/Chain.kt +++ b/BlockchainGateway/bc-gateway-core/src/main/kotlin/co/nilin/opex/bcgateway/core/model/Chain.kt @@ -1,6 +1,5 @@ package co.nilin.opex.bcgateway.core.model -import java.math.BigDecimal import java.time.LocalDateTime data class Endpoint(val url: String) @@ -15,4 +14,3 @@ data class ChainSyncRecord( val error: String?, val records: List ) - diff --git a/BlockchainGateway/bc-gateway-core/src/main/kotlin/co/nilin/opex/bcgateway/core/model/Deposit.kt b/BlockchainGateway/bc-gateway-core/src/main/kotlin/co/nilin/opex/bcgateway/core/model/Deposit.kt index 9cdaee869..d355d6df3 100644 --- a/BlockchainGateway/bc-gateway-core/src/main/kotlin/co/nilin/opex/bcgateway/core/model/Deposit.kt +++ b/BlockchainGateway/bc-gateway-core/src/main/kotlin/co/nilin/opex/bcgateway/core/model/Deposit.kt @@ -3,10 +3,11 @@ package co.nilin.opex.bcgateway.core.model import java.math.BigDecimal data class Deposit( + val id: Long?, val depositor: String, val depositorMemo: String?, val amount: BigDecimal, - val chain: String?, + val chain: String, val token: Boolean, val tokenAddress: String? ) diff --git a/BlockchainGateway/bc-gateway-core/src/main/kotlin/co/nilin/opex/bcgateway/core/model/WalletSync.kt b/BlockchainGateway/bc-gateway-core/src/main/kotlin/co/nilin/opex/bcgateway/core/model/WalletSync.kt index 81fc4115d..439e5e053 100644 --- a/BlockchainGateway/bc-gateway-core/src/main/kotlin/co/nilin/opex/bcgateway/core/model/WalletSync.kt +++ b/BlockchainGateway/bc-gateway-core/src/main/kotlin/co/nilin/opex/bcgateway/core/model/WalletSync.kt @@ -5,4 +5,4 @@ import java.time.LocalDateTime data class WalletSyncSchedule(val retryTime: LocalDateTime, val delay: Long, val batchSize: Long?) data class WalletSyncRecord( val time: LocalDateTime, val success: Boolean, val error: String?, val deposit: Deposit -) \ No newline at end of file +) diff --git a/BlockchainGateway/bc-gateway-core/src/main/kotlin/co/nilin/opex/bcgateway/core/service/ChainSyncServiceImpl.kt b/BlockchainGateway/bc-gateway-core/src/main/kotlin/co/nilin/opex/bcgateway/core/service/ChainSyncServiceImpl.kt index 9bfb97219..f8f8f4144 100644 --- a/BlockchainGateway/bc-gateway-core/src/main/kotlin/co/nilin/opex/bcgateway/core/service/ChainSyncServiceImpl.kt +++ b/BlockchainGateway/bc-gateway-core/src/main/kotlin/co/nilin/opex/bcgateway/core/service/ChainSyncServiceImpl.kt @@ -10,9 +10,9 @@ import java.time.temporal.ChronoUnit import kotlin.coroutines.coroutineContext open class ChainSyncServiceImpl( - private val syncSchedulerHandler: SyncSchedulerHandler, + private val chainSyncSchedulerHandler: ChainSyncSchedulerHandler, private val chainEndpointProxyFinder: ChainEndpointProxyFinder, - private val syncRecordHandler: SyncRecordHandler, + private val chainSyncRecordHandler: ChainSyncRecordHandler, private val walletSyncRecordHandler: WalletSyncRecordHandler, private val currencyLoader: CurrencyLoader, private val operator: TransactionalOperator, @@ -21,11 +21,11 @@ open class ChainSyncServiceImpl( override suspend fun startSyncWithChain() { withContext(coroutineContext) { - val schedules = syncSchedulerHandler.fetchActiveSchedules(currentTime()) + val schedules = chainSyncSchedulerHandler.fetchActiveSchedules(currentTime()) schedules.map { syncSchedule -> async(dispatcher) { val syncHandler = chainEndpointProxyFinder.findChainEndpointProxy(syncSchedule.chainName) - val lastSync = syncRecordHandler.loadLastSuccessRecord(syncSchedule.chainName) + val lastSync = chainSyncRecordHandler.loadLastSuccessRecord(syncSchedule.chainName) val tokens = currencyLoader.findImplementationsWithTokenOnChain(syncSchedule.chainName) .map { impl -> impl.tokenAddress!! } .toList() @@ -37,9 +37,9 @@ open class ChainSyncServiceImpl( ) operator.executeAndAwait { walletSyncRecordHandler.saveReadyToSyncTransfers(syncResult.chainName, syncResult.records) - syncRecordHandler.saveSyncRecord(syncResult) + chainSyncRecordHandler.saveSyncRecord(syncResult) if (syncResult.success) { - syncSchedulerHandler.prepareScheduleForNextTry( + chainSyncSchedulerHandler.prepareScheduleForNextTry( syncSchedule, currentTime().plus(syncSchedule.delay, ChronoUnit.SECONDS) ) @@ -51,4 +51,4 @@ open class ChainSyncServiceImpl( } protected open fun currentTime() = LocalDateTime.now() -} \ No newline at end of file +} diff --git a/BlockchainGateway/bc-gateway-core/src/main/kotlin/co/nilin/opex/bcgateway/core/service/WalletSyncServiceImpl.kt b/BlockchainGateway/bc-gateway-core/src/main/kotlin/co/nilin/opex/bcgateway/core/service/WalletSyncServiceImpl.kt index 77228ea44..977202000 100644 --- a/BlockchainGateway/bc-gateway-core/src/main/kotlin/co/nilin/opex/bcgateway/core/service/WalletSyncServiceImpl.kt +++ b/BlockchainGateway/bc-gateway-core/src/main/kotlin/co/nilin/opex/bcgateway/core/service/WalletSyncServiceImpl.kt @@ -1,6 +1,7 @@ package co.nilin.opex.bcgateway.core.service import co.nilin.opex.bcgateway.core.api.WalletSyncService +import co.nilin.opex.bcgateway.core.model.WalletSyncRecord import co.nilin.opex.bcgateway.core.spi.* import kotlinx.coroutines.ExecutorCoroutineDispatcher import kotlinx.coroutines.async @@ -27,8 +28,16 @@ class WalletSyncServiceImpl( async(dispatcher) { val uuid = assignedAddressHandler.findUuid(deposit.depositor, deposit.depositorMemo) if (uuid != null) { - val symbol = currencyLoader.findSymbol(deposit.chain!!, deposit.tokenAddress) + val symbol = currencyLoader.findSymbol(deposit.chain, deposit.tokenAddress) if (symbol != null) walletProxy.transfer(uuid, symbol, deposit.amount) + walletSyncRecordHandler.saveWalletSyncRecord( + WalletSyncRecord( + LocalDateTime.now(), + true, + null, + deposit + ) + ) } } } diff --git a/BlockchainGateway/bc-gateway-core/src/main/kotlin/co/nilin/opex/bcgateway/core/spi/SyncRecordHandler.kt b/BlockchainGateway/bc-gateway-core/src/main/kotlin/co/nilin/opex/bcgateway/core/spi/ChainSyncRecordHandler.kt similarity index 86% rename from BlockchainGateway/bc-gateway-core/src/main/kotlin/co/nilin/opex/bcgateway/core/spi/SyncRecordHandler.kt rename to BlockchainGateway/bc-gateway-core/src/main/kotlin/co/nilin/opex/bcgateway/core/spi/ChainSyncRecordHandler.kt index d7803c28a..e8cc7106a 100644 --- a/BlockchainGateway/bc-gateway-core/src/main/kotlin/co/nilin/opex/bcgateway/core/spi/SyncRecordHandler.kt +++ b/BlockchainGateway/bc-gateway-core/src/main/kotlin/co/nilin/opex/bcgateway/core/spi/ChainSyncRecordHandler.kt @@ -2,7 +2,7 @@ package co.nilin.opex.bcgateway.core.spi import co.nilin.opex.bcgateway.core.model.ChainSyncRecord -interface SyncRecordHandler { +interface ChainSyncRecordHandler { suspend fun loadLastSuccessRecord(chainName: String): ChainSyncRecord? suspend fun saveSyncRecord(syncRecord: ChainSyncRecord) -} \ No newline at end of file +} diff --git a/BlockchainGateway/bc-gateway-core/src/main/kotlin/co/nilin/opex/bcgateway/core/spi/SyncSchedulerHandler.kt b/BlockchainGateway/bc-gateway-core/src/main/kotlin/co/nilin/opex/bcgateway/core/spi/ChainSyncSchedulerHandler.kt similarity index 88% rename from BlockchainGateway/bc-gateway-core/src/main/kotlin/co/nilin/opex/bcgateway/core/spi/SyncSchedulerHandler.kt rename to BlockchainGateway/bc-gateway-core/src/main/kotlin/co/nilin/opex/bcgateway/core/spi/ChainSyncSchedulerHandler.kt index be763513e..a04e498af 100644 --- a/BlockchainGateway/bc-gateway-core/src/main/kotlin/co/nilin/opex/bcgateway/core/spi/SyncSchedulerHandler.kt +++ b/BlockchainGateway/bc-gateway-core/src/main/kotlin/co/nilin/opex/bcgateway/core/spi/ChainSyncSchedulerHandler.kt @@ -3,7 +3,7 @@ package co.nilin.opex.bcgateway.core.spi import co.nilin.opex.bcgateway.core.model.ChainSyncSchedule import java.time.LocalDateTime -interface SyncSchedulerHandler { +interface ChainSyncSchedulerHandler { suspend fun fetchActiveSchedules(time: LocalDateTime): List suspend fun prepareScheduleForNextTry(syncSchedule: ChainSyncSchedule, time: LocalDateTime) -} \ No newline at end of file +} diff --git a/BlockchainGateway/bc-gateway-core/src/main/kotlin/co/nilin/opex/bcgateway/core/spi/WalletSyncRecordHandler.kt b/BlockchainGateway/bc-gateway-core/src/main/kotlin/co/nilin/opex/bcgateway/core/spi/WalletSyncRecordHandler.kt index 18437efe9..90979ab28 100644 --- a/BlockchainGateway/bc-gateway-core/src/main/kotlin/co/nilin/opex/bcgateway/core/spi/WalletSyncRecordHandler.kt +++ b/BlockchainGateway/bc-gateway-core/src/main/kotlin/co/nilin/opex/bcgateway/core/spi/WalletSyncRecordHandler.kt @@ -1,8 +1,10 @@ package co.nilin.opex.bcgateway.core.spi import co.nilin.opex.bcgateway.core.model.Deposit +import co.nilin.opex.bcgateway.core.model.WalletSyncRecord interface WalletSyncRecordHandler { suspend fun saveReadyToSyncTransfers(chainName: String, deposits: List) + suspend fun saveWalletSyncRecord(syncRecord: WalletSyncRecord) suspend fun findReadyToSyncTransfers(count: Long?): List -} \ No newline at end of file +} diff --git a/BlockchainGateway/bc-gateway-core/src/test/kotlin/co/nilin/opex/bcgateway/core/service/ChainSyncServiceImplTest.kt b/BlockchainGateway/bc-gateway-core/src/test/kotlin/co/nilin/opex/bcgateway/core/service/ChainSyncServiceImplTest.kt index 75115b8f3..f9997357e 100644 --- a/BlockchainGateway/bc-gateway-core/src/test/kotlin/co/nilin/opex/bcgateway/core/service/ChainSyncServiceImplTest.kt +++ b/BlockchainGateway/bc-gateway-core/src/test/kotlin/co/nilin/opex/bcgateway/core/service/ChainSyncServiceImplTest.kt @@ -6,8 +6,8 @@ import co.nilin.opex.bcgateway.core.model.Endpoint import co.nilin.opex.bcgateway.core.spi.ChainEndpointProxy import co.nilin.opex.bcgateway.core.spi.ChainEndpointProxyFinder import co.nilin.opex.bcgateway.core.spi.CurrencyLoader -import co.nilin.opex.bcgateway.core.spi.SyncRecordHandler -import co.nilin.opex.bcgateway.core.spi.SyncSchedulerHandler +import co.nilin.opex.bcgateway.core.spi.ChainSyncRecordHandler +import co.nilin.opex.bcgateway.core.spi.ChainSyncSchedulerHandler import co.nilin.opex.bcgateway.core.spi.WalletSyncRecordHandler import co.nilin.opex.bcgateway.test.OPERATOR import java.time.LocalDateTime @@ -23,7 +23,6 @@ import org.mockito.kotlin.any import org.mockito.kotlin.mock import org.mockito.kotlin.times import org.mockito.kotlin.verify -import org.mockito.kotlin.verifyNoMoreInteractions import org.mockito.kotlin.verifyZeroInteractions internal class ChainSyncServiceImplTest { @@ -34,13 +33,13 @@ internal class ChainSyncServiceImplTest { val syncService: ChainSyncServiceImpl @Mock - lateinit var syncSchedulerHandler: SyncSchedulerHandler + lateinit var chainSyncSchedulerHandler: ChainSyncSchedulerHandler @Mock lateinit var chainEndpointProxyFinder: ChainEndpointProxyFinder @Mock - lateinit var syncRecordHandler: SyncRecordHandler + lateinit var chainSyncRecordHandler: ChainSyncRecordHandler @Mock lateinit var walletSyncRecordHandler: WalletSyncRecordHandler @@ -59,9 +58,9 @@ internal class ChainSyncServiceImplTest { } syncService = object : ChainSyncServiceImpl( - syncSchedulerHandler, + chainSyncSchedulerHandler, chainEndpointProxyFinder, - syncRecordHandler, + chainSyncRecordHandler, walletSyncRecordHandler, currencyLoader, OPERATOR, @@ -75,7 +74,7 @@ internal class ChainSyncServiceImplTest { fun givenNoActiveSchedules_whenStartSync_thenNoOp() { runBlocking { //given - Mockito.`when`(syncSchedulerHandler.fetchActiveSchedules(any())).thenReturn(emptyList()) + Mockito.`when`(chainSyncSchedulerHandler.fetchActiveSchedules(any())).thenReturn(emptyList()) //when syncService.startSyncWithChain() @@ -83,7 +82,7 @@ internal class ChainSyncServiceImplTest { //then verifyZeroInteractions( chainEndpointProxyFinder, - syncRecordHandler, + chainSyncRecordHandler, walletSyncRecordHandler, currencyLoader ) @@ -96,7 +95,7 @@ internal class ChainSyncServiceImplTest { //given val delay = 100L val syncSchedule = ChainSyncSchedule(ethChain, time, delay) - Mockito.`when`(syncSchedulerHandler.fetchActiveSchedules(any())) + Mockito.`when`(chainSyncSchedulerHandler.fetchActiveSchedules(any())) .thenReturn(listOf(syncSchedule)) Mockito.`when`(endpointProxy.syncTransfers(any())).thenReturn( ChainSyncRecord( @@ -108,9 +107,9 @@ internal class ChainSyncServiceImplTest { syncService.startSyncWithChain() //then - verify(syncRecordHandler).saveSyncRecord(any()) + verify(chainSyncRecordHandler).saveSyncRecord(any()) verify(walletSyncRecordHandler).saveReadyToSyncTransfers(any(), any()) - verify(syncSchedulerHandler).prepareScheduleForNextTry(syncSchedule, time.plus(delay, ChronoUnit.SECONDS)) + verify(chainSyncSchedulerHandler).prepareScheduleForNextTry(syncSchedule, time.plus(delay, ChronoUnit.SECONDS)) } } @@ -120,7 +119,7 @@ internal class ChainSyncServiceImplTest { //given val delay = 100L val syncSchedule = ChainSyncSchedule(ethChain, time, delay) - Mockito.`when`(syncSchedulerHandler.fetchActiveSchedules(any())) + Mockito.`when`(chainSyncSchedulerHandler.fetchActiveSchedules(any())) .thenReturn(listOf(syncSchedule)) Mockito.`when`(endpointProxy.syncTransfers(any())).thenReturn( ChainSyncRecord( @@ -132,11 +131,11 @@ internal class ChainSyncServiceImplTest { syncService.startSyncWithChain() //then - verify(syncRecordHandler).saveSyncRecord(any()) + verify(chainSyncRecordHandler).saveSyncRecord(any()) verify(walletSyncRecordHandler).saveReadyToSyncTransfers(any(), any()) - verify(syncSchedulerHandler, times(0)).prepareScheduleForNextTry(any(), any()) + verify(chainSyncSchedulerHandler, times(0)).prepareScheduleForNextTry(any(), any()) } } -} \ No newline at end of file +} diff --git a/BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/config/PostgresConfig.kt b/BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/config/PostgresConfig.kt index 14c3890df..df4ca430b 100644 --- a/BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/config/PostgresConfig.kt +++ b/BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/config/PostgresConfig.kt @@ -55,7 +55,7 @@ class PostgresConfig(db: DatabaseClient) { CREATE TABLE IF NOT EXISTS chain_sync_schedules ( chain VARCHAR(72) PRIMARY KEY, retry_time TIMESTAMP NOT NULL, - delay NUMERIC NOT NULL + delay INTEGER NOT NULL ); CREATE TABLE IF NOT EXISTS chain_sync_records ( chain VARCHAR(72) PRIMARY KEY, @@ -65,6 +65,28 @@ class PostgresConfig(db: DatabaseClient) { success BOOLEAN NOT NULL, error VARCHAR(100) ); + CREATE TABLE IF NOT EXISTS wallet_sync_schedules ( + id INTEGER PRIMARY KEY DEFAULT(1) CHECK(id = 1), + retry_time TIMESTAMP NOT NULL, + delay INTEGER NOT NULL, + batch_size INTEGER + ); + CREATE TABLE IF NOT EXISTS wallet_sync_records ( + id SERIAL PRIMARY KEY, + time TIMESTAMP NOT NULL, + success BOOLEAN NOT NULL, + error VARCHAR(100) + ); + CREATE TABLE IF NOT EXISTS deposits ( + id SERIAL PRIMARY KEY, + wallet_sync_record INTEGER NOT NULL, + chain VARCHAR(72) NOT NULL, + token BOOLEAN NOT NULL, + token_address VARCHAR(72), + amount NUMERIC NOT NULL, + depositor VARCHAR(72) NOT NULL, + depositor_memo VARCHAR(72) + ); CREATE TABLE IF NOT EXISTS currency ( symbol VARCHAR(72) PRIMARY KEY, name VARCHAR(72) NOT NULL @@ -79,15 +101,6 @@ class PostgresConfig(db: DatabaseClient) { withdraw_fee NUMERIC NOT NULL, withdraw_min NUMERIC NOT NULL ); - CREATE TABLE IF NOT EXISTS deposits ( - id SERIAL PRIMARY KEY, - chain VARCHAR(72), - token BOOLEAN NOT NULL, - token_address VARCHAR(72), - amount NUMERIC NOT NULL, - depositor VARCHAR(72) NOT NULL, - depositorMemo VARCHAR(72) - ); """ } initDb // initialize the database diff --git a/BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/dao/ChainSyncRecordRepository.kt b/BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/dao/ChainSyncRecordRepository.kt index b0fa51f46..20044463e 100644 --- a/BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/dao/ChainSyncRecordRepository.kt +++ b/BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/dao/ChainSyncRecordRepository.kt @@ -1,11 +1,11 @@ package co.nilin.opex.port.bcgateway.postgres.dao -import co.nilin.opex.port.bcgateway.postgres.model.SyncRecordModel +import co.nilin.opex.port.bcgateway.postgres.model.ChainSyncRecordModel import org.springframework.data.repository.reactive.ReactiveCrudRepository import org.springframework.stereotype.Repository import reactor.core.publisher.Mono @Repository -interface ChainSyncRecordRepository : ReactiveCrudRepository { - fun findByChain(chain: String): Mono -} \ No newline at end of file +interface ChainSyncRecordRepository : ReactiveCrudRepository { + fun findByChain(chain: String): Mono +} diff --git a/BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/dao/ChainSyncScheduleRepository.kt b/BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/dao/ChainSyncScheduleRepository.kt index 48a7a0252..c4395e8d5 100644 --- a/BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/dao/ChainSyncScheduleRepository.kt +++ b/BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/dao/ChainSyncScheduleRepository.kt @@ -1,13 +1,16 @@ package co.nilin.opex.port.bcgateway.postgres.dao -import co.nilin.opex.port.bcgateway.postgres.model.AssignedAddressChainModel -import co.nilin.opex.port.bcgateway.postgres.model.ChainModel -import co.nilin.opex.port.bcgateway.postgres.model.SyncScheduleModel +import co.nilin.opex.port.bcgateway.postgres.model.ChainSyncScheduleModel +import co.nilin.opex.port.bcgateway.postgres.model.WalletSyncScheduleModel import kotlinx.coroutines.flow.Flow import org.springframework.data.r2dbc.repository.Query -import org.springframework.data.repository.query.Param import org.springframework.data.repository.reactive.ReactiveCrudRepository import org.springframework.stereotype.Repository +import reactor.core.publisher.Mono +import java.time.LocalDateTime @Repository -interface ChainSyncScheduleRepository : ReactiveCrudRepository \ No newline at end of file +interface ChainSyncScheduleRepository : ReactiveCrudRepository { + @Query("select * from chain_sync_schedules where retry_time <= :time") + fun findActiveSchedule(time: LocalDateTime): Flow +} diff --git a/BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/dao/DepositRepository.kt b/BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/dao/DepositRepository.kt index f5e21069b..848d4d364 100644 --- a/BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/dao/DepositRepository.kt +++ b/BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/dao/DepositRepository.kt @@ -2,10 +2,23 @@ package co.nilin.opex.port.bcgateway.postgres.dao import co.nilin.opex.port.bcgateway.postgres.model.DepositModel import kotlinx.coroutines.flow.Flow +import org.springframework.data.r2dbc.repository.Modifying +import org.springframework.data.r2dbc.repository.Query import org.springframework.data.repository.reactive.ReactiveCrudRepository import org.springframework.stereotype.Repository +import reactor.core.publisher.Mono @Repository interface DepositRepository : ReactiveCrudRepository { fun findByChain(chain: String): Flow -} \ No newline at end of file + + @Query("select * from deposits where chain = :chain and wallet_sync_record is null") + fun findByChainWhereNotSynced(chain: String): Flow + + @Query("select * from deposits where wallet_record_id is null limit :count") + fun findLimited(count: Long?): Flow + + @Modifying + @Query("update deposits set wallet_sync_record = :walletSyncRecord where id = :id") + fun updateWalletSyncRecord(id: Long, walletSyncRecord: Long): Mono +} diff --git a/BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/dao/WalletSyncRecordRepository.kt b/BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/dao/WalletSyncRecordRepository.kt new file mode 100644 index 000000000..cab986087 --- /dev/null +++ b/BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/dao/WalletSyncRecordRepository.kt @@ -0,0 +1,8 @@ +package co.nilin.opex.port.bcgateway.postgres.dao + +import co.nilin.opex.port.bcgateway.postgres.model.WalletSyncRecordModel +import org.springframework.data.repository.reactive.ReactiveCrudRepository +import org.springframework.stereotype.Repository + +@Repository +interface WalletSyncRecordRepository : ReactiveCrudRepository diff --git a/BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/dao/WalletSyncScheduleRepository.kt b/BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/dao/WalletSyncScheduleRepository.kt new file mode 100644 index 000000000..176ff6a23 --- /dev/null +++ b/BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/dao/WalletSyncScheduleRepository.kt @@ -0,0 +1,14 @@ +package co.nilin.opex.port.bcgateway.postgres.dao + +import co.nilin.opex.port.bcgateway.postgres.model.WalletSyncScheduleModel +import org.springframework.data.r2dbc.repository.Query +import org.springframework.data.repository.reactive.ReactiveCrudRepository +import org.springframework.stereotype.Repository +import reactor.core.publisher.Mono +import java.time.LocalDateTime + +@Repository +interface WalletSyncScheduleRepository : ReactiveCrudRepository { + @Query("select * from wallet_sync_schedules where retry_time <= :time") + fun findActiveSchedule(time: LocalDateTime): Mono +} diff --git a/BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/impl/SyncRecordHandlerImpl.kt b/BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/impl/ChainSyncRecordHandlerImpl.kt similarity index 69% rename from BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/impl/SyncRecordHandlerImpl.kt rename to BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/impl/ChainSyncRecordHandlerImpl.kt index 319b8e4c0..2a2cf7799 100644 --- a/BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/impl/SyncRecordHandlerImpl.kt +++ b/BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/impl/ChainSyncRecordHandlerImpl.kt @@ -3,11 +3,10 @@ package co.nilin.opex.port.bcgateway.postgres.impl import co.nilin.opex.bcgateway.core.model.ChainSyncRecord import co.nilin.opex.bcgateway.core.model.Deposit import co.nilin.opex.bcgateway.core.model.Endpoint -import co.nilin.opex.bcgateway.core.spi.SyncRecordHandler +import co.nilin.opex.bcgateway.core.spi.ChainSyncRecordHandler import co.nilin.opex.port.bcgateway.postgres.dao.ChainSyncRecordRepository import co.nilin.opex.port.bcgateway.postgres.dao.DepositRepository -import co.nilin.opex.port.bcgateway.postgres.model.DepositModel -import co.nilin.opex.port.bcgateway.postgres.model.SyncRecordModel +import co.nilin.opex.port.bcgateway.postgres.model.ChainSyncRecordModel import kotlinx.coroutines.flow.map import kotlinx.coroutines.flow.toList import kotlinx.coroutines.reactive.awaitFirst @@ -16,15 +15,15 @@ import org.springframework.stereotype.Component import org.springframework.transaction.annotation.Transactional @Component -class SyncRecordHandlerImpl( +class ChainSyncRecordHandlerImpl( private val chainSyncRecordRepository: ChainSyncRecordRepository, private val depositRepository: DepositRepository -) : SyncRecordHandler { +) : ChainSyncRecordHandler { override suspend fun loadLastSuccessRecord(chainName: String): ChainSyncRecord? { val chainSyncRecordDao = chainSyncRecordRepository.findByChain(chainName).awaitSingleOrNull() return if (chainSyncRecordDao !== null) { - val deposits = depositRepository.findByChain(chainName).map { - Deposit(it.depositor, it.depositorMemo, it.amount, it.chain, it.token, it.tokenAddress) + val deposits = depositRepository.findByChainWhereNotSynced(chainName).map { + Deposit(it.id, it.depositor, it.depositorMemo, it.amount, it.chain, it.token, it.tokenAddress) } ChainSyncRecord( chainSyncRecordDao.chain, @@ -43,7 +42,7 @@ class SyncRecordHandlerImpl( @Transactional override suspend fun saveSyncRecord(syncRecord: ChainSyncRecord) { val chainSyncRecordDao = - SyncRecordModel( + ChainSyncRecordModel( syncRecord.chainName, syncRecord.time, syncRecord.endpoint.url, @@ -52,17 +51,5 @@ class SyncRecordHandlerImpl( syncRecord.error ) chainSyncRecordRepository.save(chainSyncRecordDao).awaitFirst() - val depositsDao = syncRecord.records.map { - DepositModel( - null, - it.depositor, - it.depositorMemo, - it.amount, - it.chain, - it.token, - it.tokenAddress - ) - } - depositRepository.saveAll(depositsDao).awaitFirst() } -} \ No newline at end of file +} diff --git a/BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/impl/ChainSyncSchedulerHandlerImpl.kt b/BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/impl/ChainSyncSchedulerHandlerImpl.kt new file mode 100644 index 000000000..f33961d16 --- /dev/null +++ b/BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/impl/ChainSyncSchedulerHandlerImpl.kt @@ -0,0 +1,29 @@ +package co.nilin.opex.port.bcgateway.postgres.impl + +import co.nilin.opex.bcgateway.core.model.ChainSyncSchedule +import co.nilin.opex.bcgateway.core.model.WalletSyncSchedule +import co.nilin.opex.bcgateway.core.spi.ChainSyncSchedulerHandler +import co.nilin.opex.port.bcgateway.postgres.dao.ChainSyncScheduleRepository +import co.nilin.opex.port.bcgateway.postgres.model.ChainSyncScheduleModel +import co.nilin.opex.port.bcgateway.postgres.model.WalletSyncScheduleModel +import kotlinx.coroutines.flow.map +import kotlinx.coroutines.flow.toList +import kotlinx.coroutines.reactive.awaitFirst +import kotlinx.coroutines.reactive.awaitSingleOrNull +import org.springframework.stereotype.Component +import java.time.LocalDateTime + +@Component +class ChainSyncSchedulerHandlerImpl(private val chainSyncScheduleRepository: ChainSyncScheduleRepository) : + ChainSyncSchedulerHandler { + override suspend fun fetchActiveSchedules(time: LocalDateTime): List { + return chainSyncScheduleRepository.findActiveSchedule(time).map { + ChainSyncSchedule(it.chain, it.retryTime, it.delay) + }.toList() + } + + override suspend fun prepareScheduleForNextTry(syncSchedule: ChainSyncSchedule, time: LocalDateTime) { + val dao = ChainSyncScheduleModel(syncSchedule.chainName, time, syncSchedule.delay) + chainSyncScheduleRepository.save(dao).awaitFirst() + } +} diff --git a/BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/impl/SyncSchedulerHandlerImpl.kt b/BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/impl/SyncSchedulerHandlerImpl.kt deleted file mode 100644 index 89c6e399b..000000000 --- a/BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/impl/SyncSchedulerHandlerImpl.kt +++ /dev/null @@ -1,17 +0,0 @@ -package co.nilin.opex.port.bcgateway.postgres.impl - -import co.nilin.opex.bcgateway.core.model.ChainSyncSchedule -import co.nilin.opex.bcgateway.core.spi.SyncSchedulerHandler -import org.springframework.stereotype.Component -import java.time.LocalDateTime - -@Component -class SyncSchedulerHandlerImpl: SyncSchedulerHandler { - override suspend fun fetchActiveSchedules(time: LocalDateTime): List { - TODO("Not yet implemented") - } - - override suspend fun prepareScheduleForNextTry(syncSchedule: ChainSyncSchedule, time: LocalDateTime) { - TODO("Not yet implemented") - } -} \ No newline at end of file diff --git a/BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/impl/WalletSyncRecordHandlerImpl.kt b/BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/impl/WalletSyncRecordHandlerImpl.kt index 3addb0f6f..c5bd769b8 100644 --- a/BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/impl/WalletSyncRecordHandlerImpl.kt +++ b/BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/impl/WalletSyncRecordHandlerImpl.kt @@ -1,21 +1,28 @@ package co.nilin.opex.port.bcgateway.postgres.impl import co.nilin.opex.bcgateway.core.model.Deposit +import co.nilin.opex.bcgateway.core.model.WalletSyncRecord import co.nilin.opex.bcgateway.core.spi.WalletSyncRecordHandler import co.nilin.opex.port.bcgateway.postgres.dao.DepositRepository +import co.nilin.opex.port.bcgateway.postgres.dao.WalletSyncRecordRepository import co.nilin.opex.port.bcgateway.postgres.model.DepositModel +import co.nilin.opex.port.bcgateway.postgres.model.WalletSyncRecordModel +import kotlinx.coroutines.flow.map +import kotlinx.coroutines.flow.toList import kotlinx.coroutines.reactive.awaitFirst import org.springframework.stereotype.Component import org.springframework.transaction.annotation.Transactional @Component class WalletSyncRecordHandlerImpl( + private val walletSyncRecordRepository: WalletSyncRecordRepository, private val depositRepository: DepositRepository ) : WalletSyncRecordHandler { @Transactional override suspend fun saveReadyToSyncTransfers(chainName: String, deposits: List) { val depositsDao = deposits.map { DepositModel( + null, null, it.depositor, it.depositorMemo, @@ -28,7 +35,22 @@ class WalletSyncRecordHandlerImpl( depositRepository.saveAll(depositsDao).awaitFirst() } + @Transactional + override suspend fun saveWalletSyncRecord(syncRecord: WalletSyncRecord) { + val dao = walletSyncRecordRepository.save( + WalletSyncRecordModel( + null, + syncRecord.time, + syncRecord.success, + syncRecord.error + ) + ).awaitFirst() + depositRepository.updateWalletSyncRecord(syncRecord.deposit.id!!, dao.id!!).awaitFirst() + } + override suspend fun findReadyToSyncTransfers(count: Long?): List { - TODO("Not yet implemented") + return depositRepository.findLimited(count).map { + Deposit(it.id, it.depositor, it.depositorMemo, it.amount, it.chain, it.token, it.tokenAddress) + }.toList() } -} \ No newline at end of file +} diff --git a/BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/impl/WalletSyncSchedulerHandlerImpl.kt b/BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/impl/WalletSyncSchedulerHandlerImpl.kt index d4ca29623..ec3fa6206 100644 --- a/BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/impl/WalletSyncSchedulerHandlerImpl.kt +++ b/BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/impl/WalletSyncSchedulerHandlerImpl.kt @@ -2,16 +2,23 @@ package co.nilin.opex.port.bcgateway.postgres.impl import co.nilin.opex.bcgateway.core.model.WalletSyncSchedule import co.nilin.opex.bcgateway.core.spi.WalletSyncSchedulerHandler +import co.nilin.opex.port.bcgateway.postgres.dao.WalletSyncScheduleRepository +import co.nilin.opex.port.bcgateway.postgres.model.WalletSyncScheduleModel +import kotlinx.coroutines.reactive.awaitFirst +import kotlinx.coroutines.reactive.awaitSingleOrNull import org.springframework.stereotype.Component import java.time.LocalDateTime @Component -class WalletSyncSchedulerHandlerImpl: WalletSyncSchedulerHandler { +class WalletSyncSchedulerHandlerImpl(private val walletSyncScheduleRepository: WalletSyncScheduleRepository) : + WalletSyncSchedulerHandler { override suspend fun fetchActiveSchedule(time: LocalDateTime): WalletSyncSchedule? { - TODO("Not yet implemented") + val dao = walletSyncScheduleRepository.findActiveSchedule(time).awaitSingleOrNull() + return if (dao !== null) WalletSyncSchedule(dao.retryTime, dao.delay, dao.batchSize) else null } override suspend fun prepareScheduleForNextTry(syncSchedule: WalletSyncSchedule, time: LocalDateTime) { - TODO("Not yet implemented") + val dao = WalletSyncScheduleModel(1, time, syncSchedule.delay, syncSchedule.batchSize) + walletSyncScheduleRepository.save(dao).awaitFirst() } -} \ No newline at end of file +} diff --git a/BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/model/SyncModel.kt b/BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/model/ChainSyncModel.kt similarity index 90% rename from BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/model/SyncModel.kt rename to BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/model/ChainSyncModel.kt index d7093e80a..c77b3da05 100644 --- a/BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/model/SyncModel.kt +++ b/BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/model/ChainSyncModel.kt @@ -6,12 +6,12 @@ import org.springframework.data.relational.core.mapping.Table import java.time.LocalDateTime @Table("chain_sync_schedules") -data class SyncScheduleModel( +data class ChainSyncScheduleModel( @Id @Column("chain") val chain: String, @Column("retry_time") val retryTime: LocalDateTime, val delay: Long ) @Table("chain_sync_records") -data class SyncRecordModel( +data class ChainSyncRecordModel( @Id @Column("chain") val chain: String, val time: LocalDateTime, @Column("endpoint_url") val endpointUrl: String, @@ -19,5 +19,3 @@ data class SyncRecordModel( val success: Boolean, val error: String? ) - - diff --git a/BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/model/DepositModel.kt b/BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/model/DepositModel.kt index d79321a48..617eec478 100644 --- a/BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/model/DepositModel.kt +++ b/BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/model/DepositModel.kt @@ -1,17 +1,17 @@ package co.nilin.opex.port.bcgateway.postgres.model import org.springframework.data.annotation.Id -import org.springframework.data.relational.core.mapping.Column import org.springframework.data.relational.core.mapping.Table import java.math.BigDecimal @Table("deposits") data class DepositModel( @Id val id: Long?, + val walletSyncRecord: Long?, val depositor: String, val depositorMemo: String?, val amount: BigDecimal, - val chain: String?, + val chain: String, val token: Boolean, val tokenAddress: String? -) \ No newline at end of file +) diff --git a/BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/model/WalletSyncModel.kt b/BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/model/WalletSyncModel.kt new file mode 100644 index 000000000..3947af2b7 --- /dev/null +++ b/BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/model/WalletSyncModel.kt @@ -0,0 +1,19 @@ +package co.nilin.opex.port.bcgateway.postgres.model + +import org.springframework.data.annotation.Id +import org.springframework.data.relational.core.mapping.Column +import org.springframework.data.relational.core.mapping.Table +import java.time.LocalDateTime + +@Table("wallet_sync_schedules") +data class WalletSyncScheduleModel( + @Id val id: Long?, val retryTime: LocalDateTime, val delay: Long, val batchSize: Long? +) + +@Table("wallet_sync_records") +data class WalletSyncRecordModel( + @Id val id: Long?, + val time: LocalDateTime, + val success: Boolean, + val error: String? +)