diff --git a/bc-gateway/bc-gateway-app/src/main/kotlin/co/nilin/opex/bcgateway/app/config/AppConfig.kt b/bc-gateway/bc-gateway-app/src/main/kotlin/co/nilin/opex/bcgateway/app/config/AppConfig.kt index e456669f7..94033e961 100644 --- a/bc-gateway/bc-gateway-app/src/main/kotlin/co/nilin/opex/bcgateway/app/config/AppConfig.kt +++ b/bc-gateway/bc-gateway-app/src/main/kotlin/co/nilin/opex/bcgateway/app/config/AppConfig.kt @@ -32,6 +32,7 @@ class AppConfig { chainEndpointProxyFinder: ChainEndpointProxyFinder, chainSyncRecordHandler: ChainSyncRecordHandler, walletSyncRecordHandler: WalletSyncRecordHandler, + chainSyncRetryHandler: ChainSyncRetryHandler, currencyLoader: CurrencyLoader, operator: TransactionalOperator ): ChainSyncService { @@ -40,6 +41,7 @@ class AppConfig { chainEndpointProxyFinder, chainSyncRecordHandler, walletSyncRecordHandler, + chainSyncRetryHandler, currencyLoader, operator, AppDispatchers.chainSyncExecutor diff --git a/bc-gateway/bc-gateway-core/src/main/kotlin/co/nilin/opex/bcgateway/core/model/Chain.kt b/bc-gateway/bc-gateway-core/src/main/kotlin/co/nilin/opex/bcgateway/core/model/Chain.kt index d88430445..91f2c8c08 100644 --- a/bc-gateway/bc-gateway-core/src/main/kotlin/co/nilin/opex/bcgateway/core/model/Chain.kt +++ b/bc-gateway/bc-gateway-core/src/main/kotlin/co/nilin/opex/bcgateway/core/model/Chain.kt @@ -4,7 +4,7 @@ import java.time.LocalDateTime data class Endpoint(val url: String) data class Chain(val name: String, val addressTypes: List, val endpoints: List) -data class ChainSyncSchedule(val chainName: String, val retryTime: LocalDateTime, val delay: Long) +data class ChainSyncSchedule(val chainName: String, val retryTime: LocalDateTime, val delay: Long, val errorDelay: Long) data class ChainSyncRecord( val chainName: String, val time: LocalDateTime, diff --git a/bc-gateway/bc-gateway-core/src/main/kotlin/co/nilin/opex/bcgateway/core/service/ChainSyncServiceImpl.kt b/bc-gateway/bc-gateway-core/src/main/kotlin/co/nilin/opex/bcgateway/core/service/ChainSyncServiceImpl.kt index 2f5bd9a21..d51df1618 100644 --- a/bc-gateway/bc-gateway-core/src/main/kotlin/co/nilin/opex/bcgateway/core/service/ChainSyncServiceImpl.kt +++ b/bc-gateway/bc-gateway-core/src/main/kotlin/co/nilin/opex/bcgateway/core/service/ChainSyncServiceImpl.kt @@ -17,6 +17,7 @@ open class ChainSyncServiceImpl( private val chainEndpointProxyFinder: ChainEndpointProxyFinder, private val chainSyncRecordHandler: ChainSyncRecordHandler, private val walletSyncRecordHandler: WalletSyncRecordHandler, + private val chainSyncRetryHandler: ChainSyncRetryHandler, private val currencyLoader: CurrencyLoader, private val operator: TransactionalOperator, private val dispatcher: ExecutorCoroutineDispatcher @@ -28,28 +29,31 @@ open class ChainSyncServiceImpl( withContext(coroutineContext) { val schedules = chainSyncSchedulerHandler.fetchActiveSchedules(currentTime()) schedules.map { syncSchedule -> - logger.info("chain syncing for: ${syncSchedule.chainName}") async(dispatcher) { val syncHandler = chainEndpointProxyFinder.findChainEndpointProxy(syncSchedule.chainName) val lastSync = chainSyncRecordHandler.loadLastSuccessRecord(syncSchedule.chainName) val tokens = currencyLoader.findImplementationsWithTokenOnChain(syncSchedule.chainName) .map { impl -> impl.tokenAddress ?: "" } .toList() + + logger.info("chain syncing for: ${syncSchedule.chainName} - block: ${lastSync?.latestBlock}") val syncResult = syncHandler.syncTransfers( ChainEndpointProxy.DepositFilter( lastSync?.latestBlock, null, tokens ) ) + + if (syncResult.success) + logger.info("request successful - synced ${syncSchedule.chainName} until ${syncResult.latestBlock}") + else + logger.info("request failed - ${syncResult.error}") + operator.executeAndAwait { walletSyncRecordHandler.saveReadyToSyncTransfers(syncResult.chainName, syncResult.records) chainSyncRecordHandler.saveSyncRecord(syncResult) - if (syncResult.success) { - chainSyncSchedulerHandler.prepareScheduleForNextTry( - syncSchedule, - currentTime().plus(syncSchedule.delay, ChronoUnit.SECONDS) - ) - } + chainSyncSchedulerHandler.prepareScheduleForNextTry(syncSchedule, syncResult.success) + chainSyncRetryHandler.handleNextTry(syncSchedule, syncResult, lastSync?.latestBlock ?: 0) } } } diff --git a/bc-gateway/bc-gateway-core/src/main/kotlin/co/nilin/opex/bcgateway/core/spi/ChainSyncRetryHandler.kt b/bc-gateway/bc-gateway-core/src/main/kotlin/co/nilin/opex/bcgateway/core/spi/ChainSyncRetryHandler.kt new file mode 100644 index 000000000..fe13da718 --- /dev/null +++ b/bc-gateway/bc-gateway-core/src/main/kotlin/co/nilin/opex/bcgateway/core/spi/ChainSyncRetryHandler.kt @@ -0,0 +1,10 @@ +package co.nilin.opex.bcgateway.core.spi + +import co.nilin.opex.bcgateway.core.model.ChainSyncRecord +import co.nilin.opex.bcgateway.core.model.ChainSyncSchedule + +interface ChainSyncRetryHandler { + + suspend fun handleNextTry(syncSchedule: ChainSyncSchedule, records: ChainSyncRecord, sentBlock:Long) + +} \ No newline at end of file diff --git a/bc-gateway/bc-gateway-core/src/main/kotlin/co/nilin/opex/bcgateway/core/spi/ChainSyncSchedulerHandler.kt b/bc-gateway/bc-gateway-core/src/main/kotlin/co/nilin/opex/bcgateway/core/spi/ChainSyncSchedulerHandler.kt index a04e498af..b798d037b 100644 --- a/bc-gateway/bc-gateway-core/src/main/kotlin/co/nilin/opex/bcgateway/core/spi/ChainSyncSchedulerHandler.kt +++ b/bc-gateway/bc-gateway-core/src/main/kotlin/co/nilin/opex/bcgateway/core/spi/ChainSyncSchedulerHandler.kt @@ -5,5 +5,5 @@ import java.time.LocalDateTime interface ChainSyncSchedulerHandler { suspend fun fetchActiveSchedules(time: LocalDateTime): List - suspend fun prepareScheduleForNextTry(syncSchedule: ChainSyncSchedule, time: LocalDateTime) + suspend fun prepareScheduleForNextTry(syncSchedule: ChainSyncSchedule, success:Boolean) } diff --git a/bc-gateway/bc-gateway-core/src/test/kotlin/co/nilin/opex/bcgateway/core/service/ChainSyncServiceImplTest.kt b/bc-gateway/bc-gateway-core/src/test/kotlin/co/nilin/opex/bcgateway/core/service/ChainSyncServiceImplTest.kt index 88f1c148a..258080a8e 100644 --- a/bc-gateway/bc-gateway-core/src/test/kotlin/co/nilin/opex/bcgateway/core/service/ChainSyncServiceImplTest.kt +++ b/bc-gateway/bc-gateway-core/src/test/kotlin/co/nilin/opex/bcgateway/core/service/ChainSyncServiceImplTest.kt @@ -18,10 +18,10 @@ import java.util.concurrent.Executors internal class ChainSyncServiceImplTest { - val ethChain = "ETH_MAINNET" - val bscChain = "BSC_MAINNET" - val time = LocalDateTime.now() - val syncService: ChainSyncServiceImpl + private val ethChain = "ETH_MAINNET" + private val bscChain = "BSC_MAINNET" + private val time = LocalDateTime.now() + private val syncService: ChainSyncServiceImpl @Mock lateinit var chainSyncSchedulerHandler: ChainSyncSchedulerHandler @@ -35,10 +35,13 @@ internal class ChainSyncServiceImplTest { @Mock lateinit var walletSyncRecordHandler: WalletSyncRecordHandler + @Mock + lateinit var chainSyncRetryHandler: ChainSyncRetryHandler + @Mock lateinit var currencyLoader: CurrencyLoader - val endpointProxy: ChainEndpointProxy = mock() + private val endpointProxy: ChainEndpointProxy = mock() init { MockitoAnnotations.openMocks(this) @@ -53,6 +56,7 @@ internal class ChainSyncServiceImplTest { chainEndpointProxyFinder, chainSyncRecordHandler, walletSyncRecordHandler, + chainSyncRetryHandler, currencyLoader, OPERATOR, Executors.newFixedThreadPool(2).asCoroutineDispatcher() @@ -85,7 +89,7 @@ internal class ChainSyncServiceImplTest { runBlocking { //given val delay = 100L - val syncSchedule = ChainSyncSchedule(ethChain, time, delay) + val syncSchedule = ChainSyncSchedule(ethChain, time, delay, delay) Mockito.`when`(chainSyncSchedulerHandler.fetchActiveSchedules(any())) .thenReturn(listOf(syncSchedule)) Mockito.`when`(endpointProxy.syncTransfers(any())).thenReturn( @@ -100,10 +104,7 @@ internal class ChainSyncServiceImplTest { //then verify(chainSyncRecordHandler).saveSyncRecord(any()) verify(walletSyncRecordHandler).saveReadyToSyncTransfers(any(), any()) - verify(chainSyncSchedulerHandler).prepareScheduleForNextTry( - syncSchedule, - time.plus(delay, ChronoUnit.SECONDS) - ) + verify(chainSyncSchedulerHandler).prepareScheduleForNextTry(syncSchedule, true) } } @@ -112,7 +113,7 @@ internal class ChainSyncServiceImplTest { runBlocking { //given val delay = 100L - val syncSchedule = ChainSyncSchedule(ethChain, time, delay) + val syncSchedule = ChainSyncSchedule(ethChain, time, delay, delay) Mockito.`when`(chainSyncSchedulerHandler.fetchActiveSchedules(any())) .thenReturn(listOf(syncSchedule)) Mockito.`when`(endpointProxy.syncTransfers(any())).thenReturn( @@ -127,7 +128,7 @@ internal class ChainSyncServiceImplTest { //then verify(chainSyncRecordHandler).saveSyncRecord(any()) verify(walletSyncRecordHandler).saveReadyToSyncTransfers(any(), any()) - verify(chainSyncSchedulerHandler, times(0)).prepareScheduleForNextTry(any(), any()) + verify(chainSyncSchedulerHandler).prepareScheduleForNextTry(syncSchedule, false) } } diff --git a/bc-gateway/bc-gateway-ports/bc-gateway-chain-proxy/src/main/kotlin/co/nilin/opex/bcgateway/ports/chainproxy/impl/ChainEndpointProxyImpl.kt b/bc-gateway/bc-gateway-ports/bc-gateway-chain-proxy/src/main/kotlin/co/nilin/opex/bcgateway/ports/chainproxy/impl/ChainEndpointProxyImpl.kt index 5c1a27265..06094fdb5 100644 --- a/bc-gateway/bc-gateway-ports/bc-gateway-chain-proxy/src/main/kotlin/co/nilin/opex/bcgateway/ports/chainproxy/impl/ChainEndpointProxyImpl.kt +++ b/bc-gateway/bc-gateway-ports/bc-gateway-chain-proxy/src/main/kotlin/co/nilin/opex/bcgateway/ports/chainproxy/impl/ChainEndpointProxyImpl.kt @@ -31,9 +31,9 @@ class ChainEndpointProxyImpl( ) data class Transfer( - var txHash: String, - var from: String, - var to: String, + var txHash: String?, + var from: String?, + var to: String?, var isTokenTransfer: Boolean, var token: String? = null, var amount: BigDecimal @@ -47,7 +47,6 @@ class ChainEndpointProxyImpl( private val logger = LoggerFactory.getLogger(ChainEndpointProxyImpl::class.java) private suspend fun requestTransferList(endpoint: String, request: TransfersRequest): DepositResult { - logger.info("request transfers: base=$endpoint") val response = webClient.post() .uri(URI.create(endpoint)) .header("Content-Type", "application/json") @@ -60,7 +59,18 @@ class ChainEndpointProxyImpl( return DepositResult( response?.latestBlock ?: request.startBlock ?: 0, response?.transfers - ?.map { Deposit(null, it.txHash, it.to, null, it.amount, chain, it.isTokenTransfer, it.token) } + ?.map { + Deposit( + null, + it.txHash ?: "", + it.to ?: "", + null, + it.amount, + chain, + it.isTokenTransfer, + it.token + ) + } ?: emptyList() ) } diff --git a/bc-gateway/bc-gateway-ports/bc-gateway-persister-postgres/src/main/kotlin/co/nilin/opex/bcgateway/ports/postgres/dao/ChainSyncRecordRepository.kt b/bc-gateway/bc-gateway-ports/bc-gateway-persister-postgres/src/main/kotlin/co/nilin/opex/bcgateway/ports/postgres/dao/ChainSyncRecordRepository.kt index 339a836cd..12da446f5 100644 --- a/bc-gateway/bc-gateway-ports/bc-gateway-persister-postgres/src/main/kotlin/co/nilin/opex/bcgateway/ports/postgres/dao/ChainSyncRecordRepository.kt +++ b/bc-gateway/bc-gateway-ports/bc-gateway-persister-postgres/src/main/kotlin/co/nilin/opex/bcgateway/ports/postgres/dao/ChainSyncRecordRepository.kt @@ -1,11 +1,24 @@ package co.nilin.opex.bcgateway.ports.postgres.dao import co.nilin.opex.bcgateway.ports.postgres.model.ChainSyncRecordModel +import org.springframework.data.r2dbc.repository.Query import org.springframework.data.repository.reactive.ReactiveCrudRepository import org.springframework.stereotype.Repository import reactor.core.publisher.Mono +import java.time.LocalDateTime @Repository interface ChainSyncRecordRepository : ReactiveCrudRepository { + + @Query("insert into chain_sync_records values(:chain, :time, :endpointUrl, :latestBlock, :success, :error)") + fun insert( + chain: String, + time: LocalDateTime, + endpointUrl: String, + latestBlock: Long?, + success: Boolean, + error: String? + ):Mono + fun findByChain(chain: String): Mono } diff --git a/bc-gateway/bc-gateway-ports/bc-gateway-persister-postgres/src/main/kotlin/co/nilin/opex/bcgateway/ports/postgres/dao/ChainSyncRetryRepository.kt b/bc-gateway/bc-gateway-ports/bc-gateway-persister-postgres/src/main/kotlin/co/nilin/opex/bcgateway/ports/postgres/dao/ChainSyncRetryRepository.kt new file mode 100644 index 000000000..32073bef1 --- /dev/null +++ b/bc-gateway/bc-gateway-ports/bc-gateway-persister-postgres/src/main/kotlin/co/nilin/opex/bcgateway/ports/postgres/dao/ChainSyncRetryRepository.kt @@ -0,0 +1,15 @@ +package co.nilin.opex.bcgateway.ports.postgres.dao + +import co.nilin.opex.bcgateway.ports.postgres.model.ChainSyncRetryModel +import org.springframework.data.r2dbc.repository.Query +import org.springframework.data.repository.reactive.ReactiveCrudRepository +import org.springframework.stereotype.Repository +import reactor.core.publisher.Mono + +@Repository +interface ChainSyncRetryRepository : ReactiveCrudRepository { + + @Query("select * from chain_sync_retry where chain = :chain and block = :block") + fun findByChainAndBlock(chain: String, block: Long): Mono + +} \ No newline at end of file diff --git a/bc-gateway/bc-gateway-ports/bc-gateway-persister-postgres/src/main/kotlin/co/nilin/opex/bcgateway/ports/postgres/impl/ChainSyncRecordHandlerImpl.kt b/bc-gateway/bc-gateway-ports/bc-gateway-persister-postgres/src/main/kotlin/co/nilin/opex/bcgateway/ports/postgres/impl/ChainSyncRecordHandlerImpl.kt index 02f130738..65755ec0d 100644 --- a/bc-gateway/bc-gateway-ports/bc-gateway-persister-postgres/src/main/kotlin/co/nilin/opex/bcgateway/ports/postgres/impl/ChainSyncRecordHandlerImpl.kt +++ b/bc-gateway/bc-gateway-ports/bc-gateway-persister-postgres/src/main/kotlin/co/nilin/opex/bcgateway/ports/postgres/impl/ChainSyncRecordHandlerImpl.kt @@ -10,6 +10,7 @@ import co.nilin.opex.bcgateway.ports.postgres.model.ChainSyncRecordModel import kotlinx.coroutines.flow.map import kotlinx.coroutines.flow.toList import kotlinx.coroutines.reactive.awaitFirst +import kotlinx.coroutines.reactive.awaitFirstOrNull import kotlinx.coroutines.reactive.awaitSingleOrNull import org.springframework.stereotype.Component import org.springframework.transaction.annotation.Transactional @@ -21,15 +22,16 @@ class ChainSyncRecordHandlerImpl( ) : ChainSyncRecordHandler { override suspend fun loadLastSuccessRecord(chainName: String): ChainSyncRecord? { val chainSyncRecordDao = chainSyncRecordRepository.findByChain(chainName).awaitSingleOrNull() - return if (chainSyncRecordDao !== null) { + return if (chainSyncRecordDao != null) { val deposits = depositRepository.findByChainWhereNotSynced(chainName).map { Deposit(it.id, it.hash, it.depositor, it.depositorMemo, it.amount, it.chain, it.token, it.tokenAddress) } + ChainSyncRecord( chainSyncRecordDao.chain, chainSyncRecordDao.time, Endpoint(chainSyncRecordDao.endpointUrl), - chainSyncRecordDao.latestBlock, + if (chainSyncRecordDao.latestBlock == null) 0 else chainSyncRecordDao.latestBlock + 1, chainSyncRecordDao.success, chainSyncRecordDao.error, deposits.toList() @@ -51,6 +53,17 @@ class ChainSyncRecordHandlerImpl( syncRecord.success, syncRecord.error ) - chainSyncRecordRepository.save(chainSyncRecordDao).awaitFirst() + + if (currentRecord != null) + chainSyncRecordRepository.save(chainSyncRecordDao).awaitFirst() + else + chainSyncRecordRepository.insert( + syncRecord.chainName, + syncRecord.time, + syncRecord.endpoint.url, + syncRecord.latestBlock ?: currentRecord?.latestBlock, + syncRecord.success, + syncRecord.error + ).awaitFirstOrNull() } } diff --git a/bc-gateway/bc-gateway-ports/bc-gateway-persister-postgres/src/main/kotlin/co/nilin/opex/bcgateway/ports/postgres/impl/ChainSyncRetryHandlerImpl.kt b/bc-gateway/bc-gateway-ports/bc-gateway-persister-postgres/src/main/kotlin/co/nilin/opex/bcgateway/ports/postgres/impl/ChainSyncRetryHandlerImpl.kt new file mode 100644 index 000000000..548b74a93 --- /dev/null +++ b/bc-gateway/bc-gateway-ports/bc-gateway-persister-postgres/src/main/kotlin/co/nilin/opex/bcgateway/ports/postgres/impl/ChainSyncRetryHandlerImpl.kt @@ -0,0 +1,66 @@ +package co.nilin.opex.bcgateway.ports.postgres.impl + +import co.nilin.opex.bcgateway.core.model.ChainSyncRecord +import co.nilin.opex.bcgateway.core.model.ChainSyncSchedule +import co.nilin.opex.bcgateway.core.spi.ChainSyncRetryHandler +import co.nilin.opex.bcgateway.ports.postgres.dao.ChainSyncRecordRepository +import co.nilin.opex.bcgateway.ports.postgres.dao.ChainSyncRetryRepository +import co.nilin.opex.bcgateway.ports.postgres.model.ChainSyncRecordModel +import co.nilin.opex.bcgateway.ports.postgres.model.ChainSyncRetryModel +import kotlinx.coroutines.reactive.awaitFirst +import kotlinx.coroutines.reactive.awaitFirstOrNull +import kotlinx.coroutines.reactive.awaitSingleOrNull +import org.springframework.stereotype.Component + +@Component +class ChainSyncRetryHandlerImpl( + private val chainSyncRetryRepository: ChainSyncRetryRepository, + private val chainSyncRecordRepository: ChainSyncRecordRepository, +) : ChainSyncRetryHandler { + + private val maxRetry = 5 + + override suspend fun handleNextTry(syncSchedule: ChainSyncSchedule, records: ChainSyncRecord, sentBlock: Long) { + val success = records.success + val chain = syncSchedule.chainName + + var retry = chainSyncRetryRepository.findByChainAndBlock(chain, sentBlock).awaitFirstOrNull() + if (success) { + if (retry != null) { + retry.apply { + retries += 1 + synced = true + } + chainSyncRetryRepository.save(retry).awaitFirst() + } + } else { + if (retry == null) { + retry = ChainSyncRetryModel(chain, sentBlock, error = records.error) + } else { + val shouldGiveUp = retry.retries >= maxRetry + retry.apply { + retries += 1 + error = records.error + giveUp = shouldGiveUp + } + } + + chainSyncRetryRepository.save(retry).awaitFirst() + + if (retry.giveUp) { + val record = chainSyncRecordRepository.findByChain(chain).awaitSingleOrNull() + if (record != null) { + val chainSyncRecordDao = ChainSyncRecordModel( + records.chainName, + records.time, + records.endpoint.url, + retry.block, + records.success, + records.error + ) + chainSyncRecordRepository.save(chainSyncRecordDao).awaitFirst() + } + } + } + } +} \ No newline at end of file diff --git a/bc-gateway/bc-gateway-ports/bc-gateway-persister-postgres/src/main/kotlin/co/nilin/opex/bcgateway/ports/postgres/impl/ChainSyncSchedulerHandlerImpl.kt b/bc-gateway/bc-gateway-ports/bc-gateway-persister-postgres/src/main/kotlin/co/nilin/opex/bcgateway/ports/postgres/impl/ChainSyncSchedulerHandlerImpl.kt index c1c4664f3..cc525f724 100644 --- a/bc-gateway/bc-gateway-ports/bc-gateway-persister-postgres/src/main/kotlin/co/nilin/opex/bcgateway/ports/postgres/impl/ChainSyncSchedulerHandlerImpl.kt +++ b/bc-gateway/bc-gateway-ports/bc-gateway-persister-postgres/src/main/kotlin/co/nilin/opex/bcgateway/ports/postgres/impl/ChainSyncSchedulerHandlerImpl.kt @@ -9,18 +9,25 @@ import kotlinx.coroutines.flow.toList import kotlinx.coroutines.reactive.awaitFirst import org.springframework.stereotype.Component import java.time.LocalDateTime +import java.time.temporal.ChronoUnit @Component class ChainSyncSchedulerHandlerImpl(private val chainSyncScheduleRepository: ChainSyncScheduleRepository) : ChainSyncSchedulerHandler { + override suspend fun fetchActiveSchedules(time: LocalDateTime): List { return chainSyncScheduleRepository.findActiveSchedule(time).map { - ChainSyncSchedule(it.chain, it.retryTime, it.delay) + ChainSyncSchedule(it.chain, it.retryTime, it.delay, it.errorDelay) }.toList() } - override suspend fun prepareScheduleForNextTry(syncSchedule: ChainSyncSchedule, time: LocalDateTime) { - val dao = ChainSyncScheduleModel(syncSchedule.chainName, time, syncSchedule.delay) + override suspend fun prepareScheduleForNextTry(syncSchedule: ChainSyncSchedule, success: Boolean) { + val chain = syncSchedule.chainName + val time = LocalDateTime.now().plus( + if (success) syncSchedule.delay else syncSchedule.errorDelay, + ChronoUnit.SECONDS + ) + val dao = ChainSyncScheduleModel(chain, time, syncSchedule.delay, syncSchedule.errorDelay) chainSyncScheduleRepository.save(dao).awaitFirst() } } diff --git a/bc-gateway/bc-gateway-ports/bc-gateway-persister-postgres/src/main/kotlin/co/nilin/opex/bcgateway/ports/postgres/model/ChainSyncModel.kt b/bc-gateway/bc-gateway-ports/bc-gateway-persister-postgres/src/main/kotlin/co/nilin/opex/bcgateway/ports/postgres/model/ChainSyncModel.kt index e294b57d6..92410fa9f 100644 --- a/bc-gateway/bc-gateway-ports/bc-gateway-persister-postgres/src/main/kotlin/co/nilin/opex/bcgateway/ports/postgres/model/ChainSyncModel.kt +++ b/bc-gateway/bc-gateway-ports/bc-gateway-persister-postgres/src/main/kotlin/co/nilin/opex/bcgateway/ports/postgres/model/ChainSyncModel.kt @@ -7,7 +7,14 @@ import java.time.LocalDateTime @Table("chain_sync_schedules") data class ChainSyncScheduleModel( - @Id @Column("chain") val chain: String, @Column("retry_time") val retryTime: LocalDateTime, val delay: Long + @Id + @Column("chain") + val chain: String, + @Column("retry_time") + val retryTime: LocalDateTime, + val delay: Long, + @Column("error_delay") + val errorDelay: Long ) @Table("chain_sync_records") diff --git a/bc-gateway/bc-gateway-ports/bc-gateway-persister-postgres/src/main/kotlin/co/nilin/opex/bcgateway/ports/postgres/model/ChainSyncRetryModel.kt b/bc-gateway/bc-gateway-ports/bc-gateway-persister-postgres/src/main/kotlin/co/nilin/opex/bcgateway/ports/postgres/model/ChainSyncRetryModel.kt new file mode 100644 index 000000000..abf959a3f --- /dev/null +++ b/bc-gateway/bc-gateway-ports/bc-gateway-persister-postgres/src/main/kotlin/co/nilin/opex/bcgateway/ports/postgres/model/ChainSyncRetryModel.kt @@ -0,0 +1,18 @@ +package co.nilin.opex.bcgateway.ports.postgres.model + +import org.springframework.data.annotation.Id +import org.springframework.data.relational.core.mapping.Column +import org.springframework.data.relational.core.mapping.Table + +@Table("chain_sync_retry") +class ChainSyncRetryModel( + val chain: String, + val block: Long, + var retries: Int = 1, + var synced: Boolean = false, + @Column("give_up") + var giveUp: Boolean = false, + var error: String? = null, + @Id + var id: Long? = null +) \ No newline at end of file diff --git a/bc-gateway/bc-gateway-ports/bc-gateway-persister-postgres/src/main/resources/data.sql b/bc-gateway/bc-gateway-ports/bc-gateway-persister-postgres/src/main/resources/data.sql index 1835230d6..21208c2da 100644 --- a/bc-gateway/bc-gateway-ports/bc-gateway-persister-postgres/src/main/resources/data.sql +++ b/bc-gateway/bc-gateway-ports/bc-gateway-persister-postgres/src/main/resources/data.sql @@ -55,8 +55,8 @@ VALUES INSERT INTO chain_sync_schedules VALUES - ('bitcoin', CURRENT_DATE, 600), - ('ethereum', CURRENT_DATE, 90) ON CONFLICT DO NOTHING; + ('bitcoin', CURRENT_DATE, 600, 60), + ('ethereum', CURRENT_DATE, 90, 60) ON CONFLICT DO NOTHING; INSERT INTO wallet_sync_schedules diff --git a/bc-gateway/bc-gateway-ports/bc-gateway-persister-postgres/src/main/resources/schema.sql b/bc-gateway/bc-gateway-ports/bc-gateway-persister-postgres/src/main/resources/schema.sql index a7603ee27..6f445565f 100644 --- a/bc-gateway/bc-gateway-ports/bc-gateway-persister-postgres/src/main/resources/schema.sql +++ b/bc-gateway/bc-gateway-ports/bc-gateway-persister-postgres/src/main/resources/schema.sql @@ -47,11 +47,28 @@ CREATE TABLE IF NOT EXISTS chain_endpoints ( CREATE TABLE IF NOT EXISTS chain_sync_schedules ( chain VARCHAR(72) PRIMARY KEY REFERENCES chains (name), retry_time TIMESTAMP NOT NULL, - delay INTEGER NOT NULL + delay INTEGER NOT NULL, + error_delay INTEGER NOT NULL +); + +CREATE TABLE IF NOT EXISTS chain_sync_retry ( + id SERIAL PRIMARY KEY, + chain VARCHAR(72) REFERENCES chains (name), + block INTEGER NOT NULL, + retries INTEGER NOT NULL DEFAULT 1, + synced BOOLEAN NOT NULL DEFAULT false, + give_up BOOLEAN NOT NULL DEFAULT false, + error TEXT, + UNIQUE (chain, block) ); CREATE TABLE IF NOT EXISTS chain_sync_records ( - chain VARCHAR(72) PRIMARY KEY REFERENCES chains (name) + chain VARCHAR(72) PRIMARY KEY REFERENCES chains (name), + time TIMESTAMP NOT NULL, + endpoint_url VARCHAR(72) NOT NULL, + latest_block INTEGER, + success BOOLEAN NOT NULL, + error VARCHAR(100) ); CREATE TABLE IF NOT EXISTS wallet_sync_schedules (