Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ jobs:
- name: Install Go
uses: actions/setup-go@v1
with:
go-version: 1.13
go-version: 1.16
- name: "Build binaries"
run: make build
- name: "Run tests"
Expand Down
16 changes: 13 additions & 3 deletions bridge/setu/listener/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ type Listener interface {

StartHeaderProcess(context.Context)

StartPolling(context.Context, time.Duration)
StartPolling(context.Context, time.Duration, bool)

StartSubscription(context.Context, ethereum.Subscription)

Expand Down Expand Up @@ -156,23 +156,33 @@ func (bl *BaseListener) StartHeaderProcess(ctx context.Context) {
}

// startPolling starts polling
func (bl *BaseListener) StartPolling(ctx context.Context, pollInterval time.Duration) {
// needAlign is used to decide whether the ticker is align to 1970 UTC.
// if true, the ticker will always tick as it begins at 1970 UTC.
func (bl *BaseListener) StartPolling(ctx context.Context, pollInterval time.Duration, needAlign bool) {
// How often to fire the passed in function in second
interval := pollInterval
firstInterval := interval
if needAlign {
now := time.Now()
baseTime := time.Unix(0, 0)
firstInterval = interval - (now.UTC().Sub(baseTime) % interval)
}

// Setup the ticket and the channel to signal
// the ending of the interval
ticker := time.NewTicker(interval)
ticker := time.NewTicker(firstInterval)

// start listening
for {
select {
case <-ticker.C:
ticker.Reset(interval)
header, err := bl.chainClient.HeaderByNumber(ctx, nil)
if err == nil && header != nil {
// send data to channel
bl.HeaderChannel <- header
}

case <-ctx.Done():
bl.Logger.Info("Polling stopped")
ticker.Stop()
Expand Down
15 changes: 12 additions & 3 deletions bridge/setu/listener/heimdall.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func (hl *HeimdallListener) Start() error {
}

hl.Logger.Info("Start polling for events", "pollInterval", pollInterval)
hl.StartPolling(headerCtx, pollInterval)
hl.StartPolling(headerCtx, pollInterval, false)
return nil
}

Expand All @@ -57,13 +57,21 @@ func (hl *HeimdallListener) ProcessHeader(*types.Header) {
}

// StartPolling - starts polling for heimdall events
func (hl *HeimdallListener) StartPolling(ctx context.Context, pollInterval time.Duration) {
// needAlign is used to decide whether the ticker is align to 1970 UTC.
// if true, the ticker will always tick as it begins at 1970 UTC.
func (hl *HeimdallListener) StartPolling(ctx context.Context, pollInterval time.Duration, needAlign bool) {
// How often to fire the passed in function in second
interval := pollInterval
firstInterval := interval
if needAlign {
now := time.Now()
baseTime := time.Unix(0, 0)
firstInterval = interval - (now.UTC().Sub(baseTime) % interval)
}

// Setup the ticket and the channel to signal
// the ending of the interval
ticker := time.NewTicker(interval)
ticker := time.NewTicker(firstInterval)

// var eventTypes []string
// eventTypes = append(eventTypes, "message.action='checkpoint'")
Expand All @@ -75,6 +83,7 @@ func (hl *HeimdallListener) StartPolling(ctx context.Context, pollInterval time.
for {
select {
case <-ticker.C:
ticker.Reset(interval)
fromBlock, toBlock, err := hl.fetchFromAndToBlock()
if err != nil {
hl.Logger.Error("Error fetching fromBlock and toBlock...skipping events query", "error", err)
Expand Down
2 changes: 1 addition & 1 deletion bridge/setu/listener/maticchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func (ml *MaticChainListener) Start() error {
if err != nil {
// start go routine to poll for new header using client object
ml.Logger.Info("Start polling for header blocks", "pollInterval", helper.GetConfig().CheckpointerPollInterval)
go ml.StartPolling(ctx, helper.GetConfig().CheckpointerPollInterval)
go ml.StartPolling(ctx, helper.GetConfig().CheckpointerPollInterval, true)
} else {
// start go routine to listen new header using subscription
go ml.StartSubscription(ctx, subscription)
Expand Down
2 changes: 1 addition & 1 deletion bridge/setu/listener/rootchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ func (rl *RootChainListener) Start() error {
// start go routine to poll for new header using client object
rl.Logger.Info("Start polling for root chain header blocks",
"root", rl.rootChainType, "pollInterval", rl.pollInterval)
go rl.StartPolling(ctx, rl.pollInterval)
go rl.StartPolling(ctx, rl.pollInterval, false)
} else {
// start go routine to listen new header using subscription
go rl.StartSubscription(ctx, subscription)
Expand Down
27 changes: 19 additions & 8 deletions bridge/setu/listener/tron.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,30 +71,41 @@ func (tl *TronListener) Start() error {

tl.Logger.Info("Start polling for events", "pollInterval", pollInterval)
// poll for new header using client object
go tl.StartPolling(headerCtx, pollInterval)
go tl.StartPolling(headerCtx, pollInterval, false)
return nil
}

// startPolling starts polling
func (tl *TronListener) StartPolling(ctx context.Context, pollInterval time.Duration) {
// needAlign is used to decide whether the ticker is align to 1970 UTC.
// if true, the ticker will always tick as it begins at 1970 UTC.
func (tl *TronListener) StartPolling(ctx context.Context, pollInterval time.Duration, needAlign bool) {
// How often to fire the passed in function in second
interval := pollInterval
firstInterval := interval

if needAlign {
now := time.Now()
baseTime := time.Unix(0, 0)
firstInterval = interval - (now.UTC().Sub(baseTime) % interval)
}

// Setup the ticket and the channel to signal
// the ending of the interval
ticker := time.NewTicker(interval)
ticker := time.NewTicker(firstInterval)

// start listening
for {
select {
case <-ticker.C:
ticker.Reset(interval)
headerNum, err := tl.contractConnector.GetTronLatestBlockNumber()
if err == nil {
// send data to channel
tl.HeaderChannel <- &(ethTypes.Header{
Number: big.NewInt(headerNum),
})
}

case <-ctx.Done():
tl.Logger.Info("Polling stopped")
ticker.Stop()
Expand Down Expand Up @@ -214,7 +225,7 @@ func (tl *TronListener) queryAndBroadcastEvents(chainManagerParams *chainmanager
// topup has to be processed first before validator join. so adding delay.
delay := util.TaskDelayBetweenEachVal
tl.sendTaskWithDelay("sendValidatorJoinToHeimdall", selectedEvent.Name, logBytes, delay)
} else if isCurrentValidator, delay := util.CalculateTaskDelayWithOffset(tl.cliCtx,1); isCurrentValidator {
} else if isCurrentValidator, delay := util.CalculateTaskDelayWithOffset(tl.cliCtx, 1); isCurrentValidator {
// topup has to be processed first before validator join. so adding delay.
delay = delay + util.TaskDelayBetweenEachVal
tl.sendTaskWithDelay("sendValidatorJoinToHeimdall", selectedEvent.Name, logBytes, delay)
Expand All @@ -237,7 +248,7 @@ func (tl *TronListener) queryAndBroadcastEvents(chainManagerParams *chainmanager
}
if bytes.Equal(event.SignerPubkey, pubkeyBytes) {
tl.sendTaskWithDelay("sendSignerChangeToHeimdall", selectedEvent.Name, logBytes, 0)
} else if isCurrentValidator, delay := util.CalculateTaskDelayWithOffset(tl.cliCtx,1); isCurrentValidator {
} else if isCurrentValidator, delay := util.CalculateTaskDelayWithOffset(tl.cliCtx, 1); isCurrentValidator {
tl.sendTaskWithDelay("sendSignerChangeToHeimdall", selectedEvent.Name, logBytes, delay)
}

Expand All @@ -248,7 +259,7 @@ func (tl *TronListener) queryAndBroadcastEvents(chainManagerParams *chainmanager
}
if util.IsEventSender(tl.cliCtx, event.ValidatorId.Uint64()) {
tl.sendTaskWithDelay("sendUnstakeInitToHeimdall", selectedEvent.Name, logBytes, 0)
} else if isCurrentValidator, delay := util.CalculateTaskDelayWithOffset(tl.cliCtx,1); isCurrentValidator {
} else if isCurrentValidator, delay := util.CalculateTaskDelayWithOffset(tl.cliCtx, 1); isCurrentValidator {
tl.sendTaskWithDelay("sendUnstakeInitToHeimdall", selectedEvent.Name, logBytes, delay)
}

Expand All @@ -264,7 +275,7 @@ func (tl *TronListener) queryAndBroadcastEvents(chainManagerParams *chainmanager
}
if bytes.Equal(event.User.Bytes(), helper.GetAddress()) {
tl.sendTaskWithDelay("sendTopUpFeeToHeimdall", selectedEvent.Name, logBytes, 0)
} else if isCurrentValidator, delay := util.CalculateTaskDelayWithOffset(tl.cliCtx,1); isCurrentValidator {
} else if isCurrentValidator, delay := util.CalculateTaskDelayWithOffset(tl.cliCtx, 1); isCurrentValidator {
tl.sendTaskWithDelay("sendTopUpFeeToHeimdall", selectedEvent.Name, logBytes, delay)
}

Expand All @@ -280,7 +291,7 @@ func (tl *TronListener) queryAndBroadcastEvents(chainManagerParams *chainmanager
}
if util.IsEventSender(tl.cliCtx, event.ValidatorId.Uint64()) {
tl.sendTaskWithDelay("sendUnjailToHeimdall", selectedEvent.Name, logBytes, 0)
} else if isCurrentValidator, delay := util.CalculateTaskDelayWithOffset(tl.cliCtx,1); isCurrentValidator {
} else if isCurrentValidator, delay := util.CalculateTaskDelayWithOffset(tl.cliCtx, 1); isCurrentValidator {
tl.sendTaskWithDelay("sendUnjailToHeimdall", selectedEvent.Name, logBytes, delay)
}

Expand Down
Loading