Skip to content
7 changes: 6 additions & 1 deletion bridge/setu/listener/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package listener
import (
"context"
"math/big"
"sync"
"time"

"github.com/cosmos/cosmos-sdk/client"
Expand Down Expand Up @@ -172,11 +173,15 @@ func (bl *BaseListener) StartPolling(ctx context.Context, pollInterval time.Dura
// the ending of the interval
ticker := time.NewTicker(firstInterval)

var tickerOnce sync.Once
// start listening
for {
select {
case <-ticker.C:
ticker.Reset(interval)
tickerOnce.Do(func() {
ticker.Reset(interval)
})

header, err := bl.chainClient.HeaderByNumber(ctx, nil)
if err == nil && header != nil {
// send data to channel
Expand Down
6 changes: 5 additions & 1 deletion bridge/setu/listener/heimdall.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"encoding/json"
"strconv"
"sync"
"time"

stakingTypes "github.com/maticnetwork/heimdall/staking/types"
Expand Down Expand Up @@ -73,6 +74,7 @@ func (hl *HeimdallListener) StartPolling(ctx context.Context, pollInterval time.
// the ending of the interval
ticker := time.NewTicker(firstInterval)

var tickerOnce sync.Once
// var eventTypes []string
// eventTypes = append(eventTypes, "message.action='checkpoint'")
// eventTypes = append(eventTypes, "message.action='event-record'")
Expand All @@ -83,7 +85,9 @@ func (hl *HeimdallListener) StartPolling(ctx context.Context, pollInterval time.
for {
select {
case <-ticker.C:
ticker.Reset(interval)
tickerOnce.Do(func() {
ticker.Reset(interval)
})
fromBlock, toBlock, err := hl.fetchFromAndToBlock()
if err != nil {
hl.Logger.Error("Error fetching fromBlock and toBlock...skipping events query", "error", err)
Expand Down
6 changes: 5 additions & 1 deletion bridge/setu/listener/tron.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"encoding/json"
"math/big"
"strconv"
"sync"
"time"

"github.com/RichardKnop/machinery/v1/tasks"
Expand Down Expand Up @@ -93,11 +94,14 @@ func (tl *TronListener) StartPolling(ctx context.Context, pollInterval time.Dura
// the ending of the interval
ticker := time.NewTicker(firstInterval)

var tickerOnce sync.Once
// start listening
for {
select {
case <-ticker.C:
ticker.Reset(interval)
tickerOnce.Do(func() {
ticker.Reset(interval)
})
headerNum, err := tl.contractConnector.GetTronLatestBlockNumber()
if err == nil {
// send data to channel
Expand Down
28 changes: 20 additions & 8 deletions bridge/setu/processor/checkpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"math"
"math/big"
"strconv"
"sync"
"time"

cliContext "github.com/cosmos/cosmos-sdk/client/context"
Expand Down Expand Up @@ -94,18 +95,30 @@ func (cp *CheckpointProcessor) RegisterTasks() {
}

func (cp *CheckpointProcessor) startPolling(ctx context.Context) {
tickerForNoAck := time.NewTicker(helper.GetConfig().NoACKPollInterval)
now := time.Now()
baseTime := time.Unix(0, 0)
// no-ack ticker interval keep same with checkpoint interval
noAckInterval := helper.GetConfig().CheckpointerPollInterval
// adjust no-ack ticker to tick at the middle of checkpoint interval
firstIntervalForNoAck := noAckInterval - (now.UTC().Sub(baseTime) % noAckInterval) - noAckInterval/2 // nolint: gomnd
if firstIntervalForNoAck <= 0 {
firstIntervalForNoAck += noAckInterval
}

tickerForNoAck := time.NewTicker(firstIntervalForNoAck)
syncInterval := helper.GetConfig().CheckpointerPollInterval / 2
tickerForSync := time.NewTicker(syncInterval)
// stop ticker when everything done
defer tickerForNoAck.Stop()
defer tickerForSync.Stop()

cp.Logger.Info("Start polling", "no-ack-interval", helper.GetConfig().NoACKPollInterval, "checkpoint-sync-interval", syncInterval)
cp.Logger.Info("Start polling", "no-ack-interval", noAckInterval, "checkpoint-sync-interval", syncInterval)

adjustOnce := sync.Once{}
for {
select {
case <-tickerForNoAck.C:
adjustOnce.Do(func() { tickerForNoAck.Reset(noAckInterval) })
go cp.handleCheckpointNoAck()
case <-tickerForSync.C:
go cp.handleCheckpointSync()
Expand Down Expand Up @@ -392,11 +405,11 @@ func (cp *CheckpointProcessor) handleCheckpointNoAck() {
return
}

isNoAckRequired, count := cp.checkIfNoAckIsRequired(checkpointContext, lastCreatedAt)
isNoAckRequired, _ := cp.checkIfNoAckIsRequired(checkpointContext, lastCreatedAt)
if isNoAckRequired {
var isProposer bool

if isProposer, err = util.IsProposerByIndex(cp.cliCtx, count); err != nil {
if isProposer, err = util.IsValidator(cp.cliCtx); err != nil {
cp.Logger.Error("Error checking IsProposer while proposing Checkpoint No-Ack ", "error", err)
return
}
Expand Down Expand Up @@ -797,10 +810,10 @@ func (cp *CheckpointProcessor) checkIfNoAckIsRequired(checkpointContext *Checkpo

checkpointCreationTime := time.Unix(lastCreatedAt, 0)
currentTime := time.Now().UTC()

timeDiff := currentTime.Sub(checkpointCreationTime)
// check if last checkpoint was < NoACK wait time
if timeDiff.Seconds() >= helper.GetConfig().NoACKWaitTime.Seconds() && index == 0 {
index = math.Floor(timeDiff.Seconds() / helper.GetConfig().NoACKWaitTime.Seconds())
if timeDiff.Seconds() >= helper.GetConfig().CheckpointerPollInterval.Seconds() && index == 0 {
index = math.Floor(timeDiff.Seconds() / helper.GetConfig().CheckpointerPollInterval.Seconds())
}

if index == 0 {
Expand All @@ -812,7 +825,6 @@ func (cp *CheckpointProcessor) checkIfNoAckIsRequired(checkpointContext *Checkpo

// check if difference between no-ack time and current time
lastNoAck := cp.getLastNoAckTime()

lastNoAckTime := time.Unix(int64(lastNoAck), 0)
// if last no ack == 0 , first no-ack to be sent
if currentTime.Sub(lastNoAckTime).Seconds() < checkpointParams.CheckpointBufferTime.Seconds() && lastNoAck != 0 {
Expand Down
28 changes: 28 additions & 0 deletions bridge/setu/util/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,34 @@ func IsProposerByIndex(cliCtx cliContext.CLIContext, index uint64) (bool, error)
return false, nil
}

func IsValidator(cliCtx cliContext.CLIContext) (bool, error) {
var validatorSet hmtypes.ValidatorSet

result, err := helper.FetchFromAPI(cliCtx,
helper.GetHeimdallServerEndpoint(CurrentValidatorSetURL),
)
if err != nil {
logger.Error("Error fetching proposers", "url", CurrentValidatorSetURL, "error", err)

return false, fmt.Errorf("failed to query heimdall server: %w", err)
}

err = json.Unmarshal(result.Result, &validatorSet)
if err != nil {
logger.Error("error unmarshalling proposer slice", "error", err)

return false, fmt.Errorf("failed to unmarshal validatorSet: %w", err)
}

for _, validator := range validatorSet.Validators {
if bytes.Equal(validator.Signer.Bytes(), helper.GetAddress()) {
return true, nil
}
}

return false, nil
}

// IsInProposerList checks if we are in current proposer
func IsInProposerList(cliCtx cliContext.CLIContext, count uint64) (bool, error) {
logger.Debug("Skipping proposers", "count", strconv.FormatUint(count, 10))
Expand Down