Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 11 additions & 33 deletions Deployment/docker-compose.dev.yml
Original file line number Diff line number Diff line change
Expand Up @@ -56,87 +56,65 @@ services:
- 127.0.0.1:8289:8089
- 127.0.0.1:1146:1044
networks:
opex-dev:
aliases:
- accountant
- opex-dev
eventlog:
ports:
- 127.0.0.1:8290:8090
networks:
opex-dev:
aliases:
- eventlog
- opex-dev
matching-engine:
ports:
- 127.0.0.1:8292:8092
- 127.0.0.1:1246:1044
networks:
opex-dev:
aliases:
- matching-engine
- opex-dev
matching-gateway:
ports:
- 127.0.0.1:8293:8093
- 127.0.0.1:1147:1044
networks:
opex-dev:
aliases:
- matching-gateway
- opex-dev
auth:
ports:
- 127.0.0.1:8283:8083
- 127.0.0.1:1148:1044
networks:
opex-dev:
aliases:
- auth
- opex-dev
wallet:
ports:
- 127.0.0.1:8291:8091
- 127.0.0.1:1149:1044
networks:
opex-dev:
aliases:
- wallet
- opex-dev
api:
ports:
- 127.0.0.1:8294:8094
- 127.0.0.1:1150:1044
networks:
opex-dev:
aliases:
- api
- opex-dev
websocket:
ports:
- 127.0.0.1:8297:8097
- 127.0.0.1:1154:1044
networks:
opex-dev:
aliases:
- websocket
- opex-dev
bc-gateway:
ports:
- 127.0.0.1:8295:8095
- 127.0.0.1:1152:1044
networks:
opex-dev:
aliases:
- bc-gateway
- opex-dev
storage:
ports:
- 127.0.0.1:8296:8096
- 127.0.0.1:1153:1044
networks:
opex-dev:
aliases:
- storage
- opex-dev
nginx:
ports:
- 8086:80
networks:
opex-dev:
aliases:
- opex_nginx
- opex-dev
networks:
opex-dev:
driver: bridge
29 changes: 1 addition & 28 deletions Deployment/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,6 @@ services:
- $DATA/zookeeper_data:/bitnami
environment:
- ALLOW_ANONYMOUS_LOGIN=yes
networks:
- opex
deploy:
restart_policy:
condition: on-failure
Expand All @@ -21,17 +19,13 @@ services:
- KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://kafka:9092
depends_on:
- zookeeper
networks:
- opex
deploy:
restart_policy:
condition: on-failure
consul:
image: 'consul'
environment:
- CONSUL_BIND_INTERFACE=eth0
networks:
- opex
deploy:
restart_policy:
condition: on-failure
Expand All @@ -43,8 +37,6 @@ services:
- $DATA/redis.conf:/usr/local/etc/redis/redis.conf
environment:
- REDIS_REPLICATION_MODE=master
networks:
- opex
deploy:
restart_policy:
condition: on-failure
Expand All @@ -56,18 +48,14 @@ services:
- POSTGRES_DB=opex_accountant
volumes:
- $DATA/accountant-data:/var/lib/postgresql/data/
networks:
- opex
postgres-eventlog:
image: "postgres"
environment:
- POSTGRES_USER=opex
- POSTGRES_PASSWORD=hiopex
- POSTGRES_DB=opex_eventlog
volumes:
- $DATA/runtime/eventlog-data:/var/lib/postgresql/data/
networks:
- opex
- $DATA/eventlog-data:/var/lib/postgresql/data/
postgres-auth:
image: "postgres"
environment:
Expand All @@ -76,8 +64,6 @@ services:
- POSTGRES_DB=opex_auth
volumes:
- $DATA/auth-data:/var/lib/postgresql/data/
networks:
- opex
deploy:
restart_policy:
condition: on-failure
Expand All @@ -89,8 +75,6 @@ services:
- POSTGRES_DB=opex_wallet
volumes:
- $DATA/wallet-data:/var/lib/postgresql/data/
networks:
- opex
deploy:
restart_policy:
condition: on-failure
Expand All @@ -102,8 +86,6 @@ services:
- POSTGRES_DB=opex_api
volumes:
- $DATA/api-data:/var/lib/postgresql/data/
networks:
- opex
deploy:
restart_policy:
condition: on-failure
Expand All @@ -115,8 +97,6 @@ services:
- POSTGRES_DB=opex_bc_gateway
volumes:
- $DATA/bc-gateway-data:/var/lib/postgresql/data/
networks:
- opex
deploy:
restart_policy:
condition: on-failure
Expand All @@ -131,10 +111,6 @@ services:
- REDIS_HOST=redis
- CONSUL_HOST=consul
- DB_IP_PORT=postgres-accountant
networks:
opex:
aliases:
- accountant
depends_on:
- zookeeper
- kafka
Expand Down Expand Up @@ -308,6 +284,3 @@ services:
- auth
- matching-gateway
- api
networks:
opex:
driver: bridge
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ class AppConfig {
chainEndpointProxyFinder: ChainEndpointProxyFinder,
chainSyncRecordHandler: ChainSyncRecordHandler,
walletSyncRecordHandler: WalletSyncRecordHandler,
chainSyncRetryHandler: ChainSyncRetryHandler,
currencyLoader: CurrencyLoader,
operator: TransactionalOperator
): ChainSyncService {
Expand All @@ -40,6 +41,7 @@ class AppConfig {
chainEndpointProxyFinder,
chainSyncRecordHandler,
walletSyncRecordHandler,
chainSyncRetryHandler,
currencyLoader,
operator,
AppDispatchers.chainSyncExecutor
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import java.time.LocalDateTime

data class Endpoint(val url: String)
data class Chain(val name: String, val addressTypes: List<AddressType>, val endpoints: List<Endpoint>)
data class ChainSyncSchedule(val chainName: String, val retryTime: LocalDateTime, val delay: Long)
data class ChainSyncSchedule(val chainName: String, val retryTime: LocalDateTime, val delay: Long, val errorDelay: Long)
data class ChainSyncRecord(
val chainName: String,
val time: LocalDateTime,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ open class ChainSyncServiceImpl(
private val chainEndpointProxyFinder: ChainEndpointProxyFinder,
private val chainSyncRecordHandler: ChainSyncRecordHandler,
private val walletSyncRecordHandler: WalletSyncRecordHandler,
private val chainSyncRetryHandler: ChainSyncRetryHandler,
private val currencyLoader: CurrencyLoader,
private val operator: TransactionalOperator,
private val dispatcher: ExecutorCoroutineDispatcher
Expand All @@ -28,28 +29,31 @@ open class ChainSyncServiceImpl(
withContext(coroutineContext) {
val schedules = chainSyncSchedulerHandler.fetchActiveSchedules(currentTime())
schedules.map { syncSchedule ->
logger.info("chain syncing for: ${syncSchedule.chainName}")
async(dispatcher) {
val syncHandler = chainEndpointProxyFinder.findChainEndpointProxy(syncSchedule.chainName)
val lastSync = chainSyncRecordHandler.loadLastSuccessRecord(syncSchedule.chainName)
val tokens = currencyLoader.findImplementationsWithTokenOnChain(syncSchedule.chainName)
.map { impl -> impl.tokenAddress ?: "" }
.toList()

logger.info("chain syncing for: ${syncSchedule.chainName} - block: ${lastSync?.latestBlock}")
val syncResult =
syncHandler.syncTransfers(
ChainEndpointProxy.DepositFilter(
lastSync?.latestBlock, null, tokens
)
)

if (syncResult.success)
logger.info("request successful - synced ${syncSchedule.chainName} until ${syncResult.latestBlock}")
else
logger.info("request failed - ${syncResult.error}")

operator.executeAndAwait {
walletSyncRecordHandler.saveReadyToSyncTransfers(syncResult.chainName, syncResult.records)
chainSyncRecordHandler.saveSyncRecord(syncResult)
if (syncResult.success) {
chainSyncSchedulerHandler.prepareScheduleForNextTry(
syncSchedule,
currentTime().plus(syncSchedule.delay, ChronoUnit.SECONDS)
)
}
chainSyncSchedulerHandler.prepareScheduleForNextTry(syncSchedule, syncResult.success)
chainSyncRetryHandler.handleNextTry(syncSchedule, syncResult, lastSync?.latestBlock ?: 0)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package co.nilin.opex.bcgateway.core.spi

import co.nilin.opex.bcgateway.core.model.ChainSyncRecord
import co.nilin.opex.bcgateway.core.model.ChainSyncSchedule

interface ChainSyncRetryHandler {

suspend fun handleNextTry(syncSchedule: ChainSyncSchedule, records: ChainSyncRecord, sentBlock:Long)

}
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,5 @@ import java.time.LocalDateTime

interface ChainSyncSchedulerHandler {
suspend fun fetchActiveSchedules(time: LocalDateTime): List<ChainSyncSchedule>
suspend fun prepareScheduleForNextTry(syncSchedule: ChainSyncSchedule, time: LocalDateTime)
suspend fun prepareScheduleForNextTry(syncSchedule: ChainSyncSchedule, success:Boolean)
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@ import java.util.concurrent.Executors

internal class ChainSyncServiceImplTest {

val ethChain = "ETH_MAINNET"
val bscChain = "BSC_MAINNET"
val time = LocalDateTime.now()
val syncService: ChainSyncServiceImpl
private val ethChain = "ETH_MAINNET"
private val bscChain = "BSC_MAINNET"
private val time = LocalDateTime.now()
private val syncService: ChainSyncServiceImpl

@Mock
lateinit var chainSyncSchedulerHandler: ChainSyncSchedulerHandler
Expand All @@ -35,10 +35,13 @@ internal class ChainSyncServiceImplTest {
@Mock
lateinit var walletSyncRecordHandler: WalletSyncRecordHandler

@Mock
lateinit var chainSyncRetryHandler: ChainSyncRetryHandler

@Mock
lateinit var currencyLoader: CurrencyLoader

val endpointProxy: ChainEndpointProxy = mock()
private val endpointProxy: ChainEndpointProxy = mock()

init {
MockitoAnnotations.openMocks(this)
Expand All @@ -53,6 +56,7 @@ internal class ChainSyncServiceImplTest {
chainEndpointProxyFinder,
chainSyncRecordHandler,
walletSyncRecordHandler,
chainSyncRetryHandler,
currencyLoader,
OPERATOR,
Executors.newFixedThreadPool(2).asCoroutineDispatcher()
Expand Down Expand Up @@ -85,7 +89,7 @@ internal class ChainSyncServiceImplTest {
runBlocking {
//given
val delay = 100L
val syncSchedule = ChainSyncSchedule(ethChain, time, delay)
val syncSchedule = ChainSyncSchedule(ethChain, time, delay, delay)
Mockito.`when`(chainSyncSchedulerHandler.fetchActiveSchedules(any()))
.thenReturn(listOf(syncSchedule))
Mockito.`when`(endpointProxy.syncTransfers(any())).thenReturn(
Expand All @@ -100,10 +104,7 @@ internal class ChainSyncServiceImplTest {
//then
verify(chainSyncRecordHandler).saveSyncRecord(any())
verify(walletSyncRecordHandler).saveReadyToSyncTransfers(any(), any())
verify(chainSyncSchedulerHandler).prepareScheduleForNextTry(
syncSchedule,
time.plus(delay, ChronoUnit.SECONDS)
)
verify(chainSyncSchedulerHandler).prepareScheduleForNextTry(syncSchedule, true)
}
}

Expand All @@ -112,7 +113,7 @@ internal class ChainSyncServiceImplTest {
runBlocking {
//given
val delay = 100L
val syncSchedule = ChainSyncSchedule(ethChain, time, delay)
val syncSchedule = ChainSyncSchedule(ethChain, time, delay, delay)
Mockito.`when`(chainSyncSchedulerHandler.fetchActiveSchedules(any()))
.thenReturn(listOf(syncSchedule))
Mockito.`when`(endpointProxy.syncTransfers(any())).thenReturn(
Expand All @@ -127,7 +128,7 @@ internal class ChainSyncServiceImplTest {
//then
verify(chainSyncRecordHandler).saveSyncRecord(any())
verify(walletSyncRecordHandler).saveReadyToSyncTransfers(any(), any())
verify(chainSyncSchedulerHandler, times(0)).prepareScheduleForNextTry(any(), any())
verify(chainSyncSchedulerHandler).prepareScheduleForNextTry(syncSchedule, false)
}
}

Expand Down
Loading