diff --git a/BlockchainGateway/bc-gateway-core/src/main/kotlin/co/nilin/opex/bcgateway/core/model/Deposit.kt b/BlockchainGateway/bc-gateway-core/src/main/kotlin/co/nilin/opex/bcgateway/core/model/Deposit.kt index 579803a87..9cdaee869 100644 --- a/BlockchainGateway/bc-gateway-core/src/main/kotlin/co/nilin/opex/bcgateway/core/model/Deposit.kt +++ b/BlockchainGateway/bc-gateway-core/src/main/kotlin/co/nilin/opex/bcgateway/core/model/Deposit.kt @@ -2,4 +2,11 @@ package co.nilin.opex.bcgateway.core.model import java.math.BigDecimal -data class Deposit(val depositor: String, val depositorMemo: String?, val amount: BigDecimal, val chain: String?, val tokenAddress: String?) +data class Deposit( + val depositor: String, + val depositorMemo: String?, + val amount: BigDecimal, + val chain: String?, + val token: Boolean, + val tokenAddress: String? +) diff --git a/BlockchainGateway/bc-gateway-core/src/main/kotlin/co/nilin/opex/bcgateway/core/model/WalletSync.kt b/BlockchainGateway/bc-gateway-core/src/main/kotlin/co/nilin/opex/bcgateway/core/model/WalletSync.kt index 3f4ec00fa..81fc4115d 100644 --- a/BlockchainGateway/bc-gateway-core/src/main/kotlin/co/nilin/opex/bcgateway/core/model/WalletSync.kt +++ b/BlockchainGateway/bc-gateway-core/src/main/kotlin/co/nilin/opex/bcgateway/core/model/WalletSync.kt @@ -4,8 +4,5 @@ import java.time.LocalDateTime data class WalletSyncSchedule(val retryTime: LocalDateTime, val delay: Long, val batchSize: Long?) data class WalletSyncRecord( - val time: LocalDateTime - , val success: Boolean - , val error: String? - , val deposit: Deposit + val time: LocalDateTime, val success: Boolean, val error: String?, val deposit: Deposit ) \ No newline at end of file diff --git a/BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/config/PostgresConfig.kt b/BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/config/PostgresConfig.kt index 0804207c9..14c3890df 100644 --- a/BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/config/PostgresConfig.kt +++ b/BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/config/PostgresConfig.kt @@ -54,14 +54,14 @@ class PostgresConfig(db: DatabaseClient) { ); CREATE TABLE IF NOT EXISTS chain_sync_schedules ( chain VARCHAR(72) PRIMARY KEY, - retry_time TIMESTAMP, - delay NUMERIC + retry_time TIMESTAMP NOT NULL, + delay NUMERIC NOT NULL ); CREATE TABLE IF NOT EXISTS chain_sync_records ( chain VARCHAR(72) PRIMARY KEY, time TIMESTAMP NOT NULL, endpoint_url VARCHAR(72) NOT NULL, - latest_block INTEGER NOT NULL, + latest_block INTEGER, success BOOLEAN NOT NULL, error VARCHAR(100) ); @@ -79,6 +79,15 @@ class PostgresConfig(db: DatabaseClient) { withdraw_fee NUMERIC NOT NULL, withdraw_min NUMERIC NOT NULL ); + CREATE TABLE IF NOT EXISTS deposits ( + id SERIAL PRIMARY KEY, + chain VARCHAR(72), + token BOOLEAN NOT NULL, + token_address VARCHAR(72), + amount NUMERIC NOT NULL, + depositor VARCHAR(72) NOT NULL, + depositorMemo VARCHAR(72) + ); """ } initDb // initialize the database diff --git a/BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/dao/ChainSyncRecordRepository.kt b/BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/dao/ChainSyncRecordRepository.kt index b3f4ee2b0..b0fa51f46 100644 --- a/BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/dao/ChainSyncRecordRepository.kt +++ b/BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/dao/ChainSyncRecordRepository.kt @@ -1,13 +1,11 @@ package co.nilin.opex.port.bcgateway.postgres.dao -import co.nilin.opex.port.bcgateway.postgres.model.AssignedAddressChainModel -import co.nilin.opex.port.bcgateway.postgres.model.ChainModel -import co.nilin.opex.port.bcgateway.postgres.model.SyncRecord -import kotlinx.coroutines.flow.Flow -import org.springframework.data.r2dbc.repository.Query -import org.springframework.data.repository.query.Param +import co.nilin.opex.port.bcgateway.postgres.model.SyncRecordModel import org.springframework.data.repository.reactive.ReactiveCrudRepository import org.springframework.stereotype.Repository +import reactor.core.publisher.Mono @Repository -interface ChainSyncRecordRepository : ReactiveCrudRepository \ No newline at end of file +interface ChainSyncRecordRepository : ReactiveCrudRepository { + fun findByChain(chain: String): Mono +} \ No newline at end of file diff --git a/BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/dao/DepositRepository.kt b/BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/dao/DepositRepository.kt new file mode 100644 index 000000000..f5e21069b --- /dev/null +++ b/BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/dao/DepositRepository.kt @@ -0,0 +1,11 @@ +package co.nilin.opex.port.bcgateway.postgres.dao + +import co.nilin.opex.port.bcgateway.postgres.model.DepositModel +import kotlinx.coroutines.flow.Flow +import org.springframework.data.repository.reactive.ReactiveCrudRepository +import org.springframework.stereotype.Repository + +@Repository +interface DepositRepository : ReactiveCrudRepository { + fun findByChain(chain: String): Flow +} \ No newline at end of file diff --git a/BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/impl/SyncRecordHandlerImpl.kt b/BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/impl/SyncRecordHandlerImpl.kt new file mode 100644 index 000000000..319b8e4c0 --- /dev/null +++ b/BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/impl/SyncRecordHandlerImpl.kt @@ -0,0 +1,68 @@ +package co.nilin.opex.port.bcgateway.postgres.impl + +import co.nilin.opex.bcgateway.core.model.ChainSyncRecord +import co.nilin.opex.bcgateway.core.model.Deposit +import co.nilin.opex.bcgateway.core.model.Endpoint +import co.nilin.opex.bcgateway.core.spi.SyncRecordHandler +import co.nilin.opex.port.bcgateway.postgres.dao.ChainSyncRecordRepository +import co.nilin.opex.port.bcgateway.postgres.dao.DepositRepository +import co.nilin.opex.port.bcgateway.postgres.model.DepositModel +import co.nilin.opex.port.bcgateway.postgres.model.SyncRecordModel +import kotlinx.coroutines.flow.map +import kotlinx.coroutines.flow.toList +import kotlinx.coroutines.reactive.awaitFirst +import kotlinx.coroutines.reactive.awaitSingleOrNull +import org.springframework.stereotype.Component +import org.springframework.transaction.annotation.Transactional + +@Component +class SyncRecordHandlerImpl( + private val chainSyncRecordRepository: ChainSyncRecordRepository, + private val depositRepository: DepositRepository +) : SyncRecordHandler { + override suspend fun loadLastSuccessRecord(chainName: String): ChainSyncRecord? { + val chainSyncRecordDao = chainSyncRecordRepository.findByChain(chainName).awaitSingleOrNull() + return if (chainSyncRecordDao !== null) { + val deposits = depositRepository.findByChain(chainName).map { + Deposit(it.depositor, it.depositorMemo, it.amount, it.chain, it.token, it.tokenAddress) + } + ChainSyncRecord( + chainSyncRecordDao.chain, + chainSyncRecordDao.time, + Endpoint(chainSyncRecordDao.endpointUrl), + chainSyncRecordDao.latestBlock, + chainSyncRecordDao.success, + chainSyncRecordDao.error, + deposits.toList() + ) + } else { + null + } + } + + @Transactional + override suspend fun saveSyncRecord(syncRecord: ChainSyncRecord) { + val chainSyncRecordDao = + SyncRecordModel( + syncRecord.chainName, + syncRecord.time, + syncRecord.endpoint.url, + syncRecord.latestBlock, + syncRecord.success, + syncRecord.error + ) + chainSyncRecordRepository.save(chainSyncRecordDao).awaitFirst() + val depositsDao = syncRecord.records.map { + DepositModel( + null, + it.depositor, + it.depositorMemo, + it.amount, + it.chain, + it.token, + it.tokenAddress + ) + } + depositRepository.saveAll(depositsDao).awaitFirst() + } +} \ No newline at end of file diff --git a/BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/impl/WalletSyncRecordHandlerImpl.kt b/BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/impl/WalletSyncRecordHandlerImpl.kt new file mode 100644 index 000000000..3addb0f6f --- /dev/null +++ b/BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/impl/WalletSyncRecordHandlerImpl.kt @@ -0,0 +1,34 @@ +package co.nilin.opex.port.bcgateway.postgres.impl + +import co.nilin.opex.bcgateway.core.model.Deposit +import co.nilin.opex.bcgateway.core.spi.WalletSyncRecordHandler +import co.nilin.opex.port.bcgateway.postgres.dao.DepositRepository +import co.nilin.opex.port.bcgateway.postgres.model.DepositModel +import kotlinx.coroutines.reactive.awaitFirst +import org.springframework.stereotype.Component +import org.springframework.transaction.annotation.Transactional + +@Component +class WalletSyncRecordHandlerImpl( + private val depositRepository: DepositRepository +) : WalletSyncRecordHandler { + @Transactional + override suspend fun saveReadyToSyncTransfers(chainName: String, deposits: List) { + val depositsDao = deposits.map { + DepositModel( + null, + it.depositor, + it.depositorMemo, + it.amount, + it.chain, + it.token, + it.tokenAddress + ) + } + depositRepository.saveAll(depositsDao).awaitFirst() + } + + override suspend fun findReadyToSyncTransfers(count: Long?): List { + TODO("Not yet implemented") + } +} \ No newline at end of file diff --git a/BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/model/DepositModel.kt b/BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/model/DepositModel.kt new file mode 100644 index 000000000..d79321a48 --- /dev/null +++ b/BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/model/DepositModel.kt @@ -0,0 +1,17 @@ +package co.nilin.opex.port.bcgateway.postgres.model + +import org.springframework.data.annotation.Id +import org.springframework.data.relational.core.mapping.Column +import org.springframework.data.relational.core.mapping.Table +import java.math.BigDecimal + +@Table("deposits") +data class DepositModel( + @Id val id: Long?, + val depositor: String, + val depositorMemo: String?, + val amount: BigDecimal, + val chain: String?, + val token: Boolean, + val tokenAddress: String? +) \ No newline at end of file diff --git a/BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/model/SyncModel.kt b/BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/model/SyncModel.kt index a131264ce..d7093e80a 100644 --- a/BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/model/SyncModel.kt +++ b/BlockchainGateway/bc-gateway-ports/bc-persister-postgres/src/main/kotlin/co/nilin/opex/port/bcgateway/postgres/model/SyncModel.kt @@ -5,13 +5,13 @@ import org.springframework.data.relational.core.mapping.Column import org.springframework.data.relational.core.mapping.Table import java.time.LocalDateTime -@Table("chain_sync_schedule") +@Table("chain_sync_schedules") data class SyncScheduleModel( @Id @Column("chain") val chain: String, @Column("retry_time") val retryTime: LocalDateTime, val delay: Long ) -@Table("chain_sync_record") -data class SyncRecord( +@Table("chain_sync_records") +data class SyncRecordModel( @Id @Column("chain") val chain: String, val time: LocalDateTime, @Column("endpoint_url") val endpointUrl: String,