Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
ff3f0a8
Add SyncRecordHandler, WalletSyncRecordHandler raw implementations
ebrahimmfadae Sep 11, 2021
1c3e13c
Merge branch 'feature/1-MVP' into iss26-implement-chain-sync
ebrahimmfadae Sep 11, 2021
4d5e31e
Fix some optional columns in postgres module
ebrahimmfadae Sep 12, 2021
ea4be34
Implement SyncRecordHandlerImpl.loadLastSuccessRecord() without recor…
ebrahimmfadae Sep 12, 2021
8ae1fba
Add deposits table and repository
ebrahimmfadae Sep 12, 2021
d7b60b4
Add records to sync record handler
ebrahimmfadae Sep 12, 2021
26e30b6
Add token flag to deposit
ebrahimmfadae Sep 12, 2021
865ddbf
Implement SyncRecordHandlerImpl.saveSyncRecord()
ebrahimmfadae Sep 12, 2021
0630720
Add transactional saving ability to sync records
ebrahimmfadae Sep 12, 2021
075b908
Organize sync record imports
ebrahimmfadae Sep 12, 2021
ba26622
Implement saveReadyToSyncTransfers
ebrahimmfadae Sep 12, 2021
91c9cb3
Fix SyncModel table names
ebrahimmfadae Sep 12, 2021
23d36cc
Add wallet sync schedule table and repository
ebrahimmfadae Sep 12, 2021
3725299
Improve column types
ebrahimmfadae Sep 12, 2021
e62758d
Merge branch 'iss26-implement-chain-sync' into iss28-implement-wallet…
ebrahimmfadae Sep 13, 2021
83c76c0
Update wallet sync record
ebrahimmfadae Sep 14, 2021
22d31dd
Update sync record names
ebrahimmfadae Sep 14, 2021
fc54dba
Separate chain sync and wallet sync deposits
ebrahimmfadae Sep 14, 2021
9d6f5e2
Make chain a required property in deposit
ebrahimmfadae Sep 14, 2021
a63193b
Fix wallet sync record model
ebrahimmfadae Sep 14, 2021
fc6c2f4
Add wallet sync repositories
ebrahimmfadae Sep 14, 2021
7c8d0ec
Add single row constraint to wallet sync schedules
ebrahimmfadae Sep 14, 2021
83d50e9
Implement wallet sync scheduler handler
ebrahimmfadae Sep 14, 2021
737b314
Implement deposit sync state
ebrahimmfadae Sep 14, 2021
51d800f
Merge branch 'feature/1-MVP' into iss28-implement-wallet-sync
ebrahimmfadae Sep 14, 2021
c068fc7
Remove wallet sync deposit model
ebrahimmfadae Sep 14, 2021
c8c1b22
Remove chain sync scheduler handler
ebrahimmfadae Sep 14, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,17 +28,17 @@ class AppConfig {

@Bean
fun chainSyncService(
syncSchedulerHandler: SyncSchedulerHandler,
chainSyncSchedulerHandler: ChainSyncSchedulerHandler,
chainEndpointProxyFinder: ChainEndpointProxyFinder,
syncRecordHandler: SyncRecordHandler,
chainSyncRecordHandler: ChainSyncRecordHandler,
walletSyncRecordHandler: WalletSyncRecordHandler,
currencyLoader: CurrencyLoader,
operator: TransactionalOperator
): ChainSyncService {
return ChainSyncServiceImpl(
syncSchedulerHandler,
chainSyncSchedulerHandler,
chainEndpointProxyFinder,
syncRecordHandler,
chainSyncRecordHandler,
walletSyncRecordHandler,
currencyLoader,
operator,
Expand All @@ -61,4 +61,4 @@ class AppConfig {
fun infoService(): InfoService {
return InfoServiceImpl()
}
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package co.nilin.opex.bcgateway.core.model

import java.math.BigDecimal
import java.time.LocalDateTime

data class Endpoint(val url: String)
Expand All @@ -15,4 +14,3 @@ data class ChainSyncRecord(
val error: String?,
val records: List<Deposit>
)

Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,11 @@ package co.nilin.opex.bcgateway.core.model
import java.math.BigDecimal

data class Deposit(
val id: Long?,
val depositor: String,
val depositorMemo: String?,
val amount: BigDecimal,
val chain: String?,
val chain: String,
val token: Boolean,
val tokenAddress: String?
)
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,4 @@ import java.time.LocalDateTime
data class WalletSyncSchedule(val retryTime: LocalDateTime, val delay: Long, val batchSize: Long?)
data class WalletSyncRecord(
val time: LocalDateTime, val success: Boolean, val error: String?, val deposit: Deposit
)
)
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,9 @@ import java.time.temporal.ChronoUnit
import kotlin.coroutines.coroutineContext

open class ChainSyncServiceImpl(
private val syncSchedulerHandler: SyncSchedulerHandler,
private val chainSyncSchedulerHandler: ChainSyncSchedulerHandler,
private val chainEndpointProxyFinder: ChainEndpointProxyFinder,
private val syncRecordHandler: SyncRecordHandler,
private val chainSyncRecordHandler: ChainSyncRecordHandler,
private val walletSyncRecordHandler: WalletSyncRecordHandler,
private val currencyLoader: CurrencyLoader,
private val operator: TransactionalOperator,
Expand All @@ -21,11 +21,11 @@ open class ChainSyncServiceImpl(

override suspend fun startSyncWithChain() {
withContext(coroutineContext) {
val schedules = syncSchedulerHandler.fetchActiveSchedules(currentTime())
val schedules = chainSyncSchedulerHandler.fetchActiveSchedules(currentTime())
schedules.map { syncSchedule ->
async(dispatcher) {
val syncHandler = chainEndpointProxyFinder.findChainEndpointProxy(syncSchedule.chainName)
val lastSync = syncRecordHandler.loadLastSuccessRecord(syncSchedule.chainName)
val lastSync = chainSyncRecordHandler.loadLastSuccessRecord(syncSchedule.chainName)
val tokens = currencyLoader.findImplementationsWithTokenOnChain(syncSchedule.chainName)
.map { impl -> impl.tokenAddress!! }
.toList()
Expand All @@ -37,9 +37,9 @@ open class ChainSyncServiceImpl(
)
operator.executeAndAwait {
walletSyncRecordHandler.saveReadyToSyncTransfers(syncResult.chainName, syncResult.records)
syncRecordHandler.saveSyncRecord(syncResult)
chainSyncRecordHandler.saveSyncRecord(syncResult)
if (syncResult.success) {
syncSchedulerHandler.prepareScheduleForNextTry(
chainSyncSchedulerHandler.prepareScheduleForNextTry(
syncSchedule,
currentTime().plus(syncSchedule.delay, ChronoUnit.SECONDS)
)
Expand All @@ -51,4 +51,4 @@ open class ChainSyncServiceImpl(
}

protected open fun currentTime() = LocalDateTime.now()
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package co.nilin.opex.bcgateway.core.service

import co.nilin.opex.bcgateway.core.api.WalletSyncService
import co.nilin.opex.bcgateway.core.model.WalletSyncRecord
import co.nilin.opex.bcgateway.core.spi.*
import kotlinx.coroutines.ExecutorCoroutineDispatcher
import kotlinx.coroutines.async
Expand All @@ -27,8 +28,16 @@ class WalletSyncServiceImpl(
async(dispatcher) {
val uuid = assignedAddressHandler.findUuid(deposit.depositor, deposit.depositorMemo)
if (uuid != null) {
val symbol = currencyLoader.findSymbol(deposit.chain!!, deposit.tokenAddress)
val symbol = currencyLoader.findSymbol(deposit.chain, deposit.tokenAddress)
if (symbol != null) walletProxy.transfer(uuid, symbol, deposit.amount)
walletSyncRecordHandler.saveWalletSyncRecord(
WalletSyncRecord(
LocalDateTime.now(),
true,
null,
deposit
)
)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package co.nilin.opex.bcgateway.core.spi

import co.nilin.opex.bcgateway.core.model.ChainSyncRecord

interface SyncRecordHandler {
interface ChainSyncRecordHandler {
suspend fun loadLastSuccessRecord(chainName: String): ChainSyncRecord?
suspend fun saveSyncRecord(syncRecord: ChainSyncRecord)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package co.nilin.opex.bcgateway.core.spi
import co.nilin.opex.bcgateway.core.model.ChainSyncSchedule
import java.time.LocalDateTime

interface SyncSchedulerHandler {
interface ChainSyncSchedulerHandler {
suspend fun fetchActiveSchedules(time: LocalDateTime): List<ChainSyncSchedule>
suspend fun prepareScheduleForNextTry(syncSchedule: ChainSyncSchedule, time: LocalDateTime)
}
}
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
package co.nilin.opex.bcgateway.core.spi

import co.nilin.opex.bcgateway.core.model.Deposit
import co.nilin.opex.bcgateway.core.model.WalletSyncRecord

interface WalletSyncRecordHandler {
suspend fun saveReadyToSyncTransfers(chainName: String, deposits: List<Deposit>)
suspend fun saveWalletSyncRecord(syncRecord: WalletSyncRecord)
suspend fun findReadyToSyncTransfers(count: Long?): List<Deposit>
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ import co.nilin.opex.bcgateway.core.model.Endpoint
import co.nilin.opex.bcgateway.core.spi.ChainEndpointProxy
import co.nilin.opex.bcgateway.core.spi.ChainEndpointProxyFinder
import co.nilin.opex.bcgateway.core.spi.CurrencyLoader
import co.nilin.opex.bcgateway.core.spi.SyncRecordHandler
import co.nilin.opex.bcgateway.core.spi.SyncSchedulerHandler
import co.nilin.opex.bcgateway.core.spi.ChainSyncRecordHandler
import co.nilin.opex.bcgateway.core.spi.ChainSyncSchedulerHandler
import co.nilin.opex.bcgateway.core.spi.WalletSyncRecordHandler
import co.nilin.opex.bcgateway.test.OPERATOR
import java.time.LocalDateTime
Expand All @@ -23,7 +23,6 @@ import org.mockito.kotlin.any
import org.mockito.kotlin.mock
import org.mockito.kotlin.times
import org.mockito.kotlin.verify
import org.mockito.kotlin.verifyNoMoreInteractions
import org.mockito.kotlin.verifyZeroInteractions

internal class ChainSyncServiceImplTest {
Expand All @@ -34,13 +33,13 @@ internal class ChainSyncServiceImplTest {
val syncService: ChainSyncServiceImpl

@Mock
lateinit var syncSchedulerHandler: SyncSchedulerHandler
lateinit var chainSyncSchedulerHandler: ChainSyncSchedulerHandler

@Mock
lateinit var chainEndpointProxyFinder: ChainEndpointProxyFinder

@Mock
lateinit var syncRecordHandler: SyncRecordHandler
lateinit var chainSyncRecordHandler: ChainSyncRecordHandler

@Mock
lateinit var walletSyncRecordHandler: WalletSyncRecordHandler
Expand All @@ -59,9 +58,9 @@ internal class ChainSyncServiceImplTest {
}

syncService = object : ChainSyncServiceImpl(
syncSchedulerHandler,
chainSyncSchedulerHandler,
chainEndpointProxyFinder,
syncRecordHandler,
chainSyncRecordHandler,
walletSyncRecordHandler,
currencyLoader,
OPERATOR,
Expand All @@ -75,15 +74,15 @@ internal class ChainSyncServiceImplTest {
fun givenNoActiveSchedules_whenStartSync_thenNoOp() {
runBlocking {
//given
Mockito.`when`(syncSchedulerHandler.fetchActiveSchedules(any())).thenReturn(emptyList())
Mockito.`when`(chainSyncSchedulerHandler.fetchActiveSchedules(any())).thenReturn(emptyList())

//when
syncService.startSyncWithChain()

//then
verifyZeroInteractions(
chainEndpointProxyFinder,
syncRecordHandler,
chainSyncRecordHandler,
walletSyncRecordHandler,
currencyLoader
)
Expand All @@ -96,7 +95,7 @@ internal class ChainSyncServiceImplTest {
//given
val delay = 100L
val syncSchedule = ChainSyncSchedule(ethChain, time, delay)
Mockito.`when`(syncSchedulerHandler.fetchActiveSchedules(any()))
Mockito.`when`(chainSyncSchedulerHandler.fetchActiveSchedules(any()))
.thenReturn(listOf(syncSchedule))
Mockito.`when`(endpointProxy.syncTransfers(any())).thenReturn(
ChainSyncRecord(
Expand All @@ -108,9 +107,9 @@ internal class ChainSyncServiceImplTest {
syncService.startSyncWithChain()

//then
verify(syncRecordHandler).saveSyncRecord(any())
verify(chainSyncRecordHandler).saveSyncRecord(any())
verify(walletSyncRecordHandler).saveReadyToSyncTransfers(any(), any())
verify(syncSchedulerHandler).prepareScheduleForNextTry(syncSchedule, time.plus(delay, ChronoUnit.SECONDS))
verify(chainSyncSchedulerHandler).prepareScheduleForNextTry(syncSchedule, time.plus(delay, ChronoUnit.SECONDS))
}
}

Expand All @@ -120,7 +119,7 @@ internal class ChainSyncServiceImplTest {
//given
val delay = 100L
val syncSchedule = ChainSyncSchedule(ethChain, time, delay)
Mockito.`when`(syncSchedulerHandler.fetchActiveSchedules(any()))
Mockito.`when`(chainSyncSchedulerHandler.fetchActiveSchedules(any()))
.thenReturn(listOf(syncSchedule))
Mockito.`when`(endpointProxy.syncTransfers(any())).thenReturn(
ChainSyncRecord(
Expand All @@ -132,11 +131,11 @@ internal class ChainSyncServiceImplTest {
syncService.startSyncWithChain()

//then
verify(syncRecordHandler).saveSyncRecord(any())
verify(chainSyncRecordHandler).saveSyncRecord(any())
verify(walletSyncRecordHandler).saveReadyToSyncTransfers(any(), any())
verify(syncSchedulerHandler, times(0)).prepareScheduleForNextTry(any(), any())
verify(chainSyncSchedulerHandler, times(0)).prepareScheduleForNextTry(any(), any())
}
}


}
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ class PostgresConfig(db: DatabaseClient) {
CREATE TABLE IF NOT EXISTS chain_sync_schedules (
chain VARCHAR(72) PRIMARY KEY,
retry_time TIMESTAMP NOT NULL,
delay NUMERIC NOT NULL
delay INTEGER NOT NULL
);
CREATE TABLE IF NOT EXISTS chain_sync_records (
chain VARCHAR(72) PRIMARY KEY,
Expand All @@ -65,6 +65,28 @@ class PostgresConfig(db: DatabaseClient) {
success BOOLEAN NOT NULL,
error VARCHAR(100)
);
CREATE TABLE IF NOT EXISTS wallet_sync_schedules (
id INTEGER PRIMARY KEY DEFAULT(1) CHECK(id = 1),
retry_time TIMESTAMP NOT NULL,
delay INTEGER NOT NULL,
batch_size INTEGER
);
CREATE TABLE IF NOT EXISTS wallet_sync_records (
id SERIAL PRIMARY KEY,
time TIMESTAMP NOT NULL,
success BOOLEAN NOT NULL,
error VARCHAR(100)
);
CREATE TABLE IF NOT EXISTS deposits (
id SERIAL PRIMARY KEY,
wallet_sync_record INTEGER NOT NULL,
chain VARCHAR(72) NOT NULL,
token BOOLEAN NOT NULL,
token_address VARCHAR(72),
amount NUMERIC NOT NULL,
depositor VARCHAR(72) NOT NULL,
depositor_memo VARCHAR(72)
);
CREATE TABLE IF NOT EXISTS currency (
symbol VARCHAR(72) PRIMARY KEY,
name VARCHAR(72) NOT NULL
Expand All @@ -79,15 +101,6 @@ class PostgresConfig(db: DatabaseClient) {
withdraw_fee NUMERIC NOT NULL,
withdraw_min NUMERIC NOT NULL
);
CREATE TABLE IF NOT EXISTS deposits (
id SERIAL PRIMARY KEY,
chain VARCHAR(72),
token BOOLEAN NOT NULL,
token_address VARCHAR(72),
amount NUMERIC NOT NULL,
depositor VARCHAR(72) NOT NULL,
depositorMemo VARCHAR(72)
);
"""
}
initDb // initialize the database
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
package co.nilin.opex.port.bcgateway.postgres.dao

import co.nilin.opex.port.bcgateway.postgres.model.SyncRecordModel
import co.nilin.opex.port.bcgateway.postgres.model.ChainSyncRecordModel
import org.springframework.data.repository.reactive.ReactiveCrudRepository
import org.springframework.stereotype.Repository
import reactor.core.publisher.Mono

@Repository
interface ChainSyncRecordRepository : ReactiveCrudRepository<SyncRecordModel, String> {
fun findByChain(chain: String): Mono<SyncRecordModel>
}
interface ChainSyncRecordRepository : ReactiveCrudRepository<ChainSyncRecordModel, String> {
fun findByChain(chain: String): Mono<ChainSyncRecordModel>
}
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
package co.nilin.opex.port.bcgateway.postgres.dao

import co.nilin.opex.port.bcgateway.postgres.model.AssignedAddressChainModel
import co.nilin.opex.port.bcgateway.postgres.model.ChainModel
import co.nilin.opex.port.bcgateway.postgres.model.SyncScheduleModel
import co.nilin.opex.port.bcgateway.postgres.model.ChainSyncScheduleModel
import co.nilin.opex.port.bcgateway.postgres.model.WalletSyncScheduleModel
import kotlinx.coroutines.flow.Flow
import org.springframework.data.r2dbc.repository.Query
import org.springframework.data.repository.query.Param
import org.springframework.data.repository.reactive.ReactiveCrudRepository
import org.springframework.stereotype.Repository
import reactor.core.publisher.Mono
import java.time.LocalDateTime

@Repository
interface ChainSyncScheduleRepository : ReactiveCrudRepository<SyncScheduleModel, String>
interface ChainSyncScheduleRepository : ReactiveCrudRepository<ChainSyncScheduleModel, String> {
@Query("select * from chain_sync_schedules where retry_time <= :time")
fun findActiveSchedule(time: LocalDateTime): Flow<ChainSyncScheduleModel>
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,23 @@ package co.nilin.opex.port.bcgateway.postgres.dao

import co.nilin.opex.port.bcgateway.postgres.model.DepositModel
import kotlinx.coroutines.flow.Flow
import org.springframework.data.r2dbc.repository.Modifying
import org.springframework.data.r2dbc.repository.Query
import org.springframework.data.repository.reactive.ReactiveCrudRepository
import org.springframework.stereotype.Repository
import reactor.core.publisher.Mono

@Repository
interface DepositRepository : ReactiveCrudRepository<DepositModel, Long> {
fun findByChain(chain: String): Flow<DepositModel>
}

@Query("select * from deposits where chain = :chain and wallet_sync_record is null")
fun findByChainWhereNotSynced(chain: String): Flow<DepositModel>

@Query("select * from deposits where wallet_record_id is null limit :count")
fun findLimited(count: Long?): Flow<DepositModel>

@Modifying
@Query("update deposits set wallet_sync_record = :walletSyncRecord where id = :id")
fun updateWalletSyncRecord(id: Long, walletSyncRecord: Long): Mono<Int>
}
Loading