Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions bridge/setu/broadcaster/broadcaster.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"sync"
"time"

"github.com/cosmos/cosmos-sdk/client"
cliContext "github.com/cosmos/cosmos-sdk/client/context"
Expand Down Expand Up @@ -56,6 +57,16 @@ func NewTxBroadcaster(cdc *codec.Codec) *TxBroadcaster {
return &txBroadcaster
}

func (tb *TxBroadcaster) BroadcastToHeimdallWithDelay(msg sdk.Msg, delay time.Duration) error {
eta := time.Now().Add(delay)
timeAfterTrigger := time.After(delay)
tb.logger.Info("Tx sent on heimdall ", "msg", msg.Type(), "currentTime", time.Now(), "delayTime", eta)
select {
case <-timeAfterTrigger:
return tb.BroadcastToHeimdall(msg)
}
}

// BroadcastToHeimdall broadcast to heimdall
func (tb *TxBroadcaster) BroadcastToHeimdall(msg sdk.Msg) error {
tb.heimdallMutex.Lock()
Expand Down
32 changes: 19 additions & 13 deletions bridge/setu/processor/checkpointsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,13 +61,15 @@ func (cp *CheckpointProcessor) handleCheckpointSync() {
msg := checkpointTypes.NewMsgCheckpointSync(hmTypes.BytesToHeimdallAddress(helper.GetAddress()),
proposer, nextCheckpointNumber, start, end, rootChain)
// return broadcast to heimdall
if err := cp.txBroadcaster.BroadcastToHeimdall(msg); err != nil {
cp.Logger.Error("Error while broadcasting checkpoint-sync to heimdall",
"root", rootChain, "number", nextCheckpointNumber, "start", start, "end", end, "error", err)
continue
if isCurrentValidator, delay := util.CalculateTaskDelay(cp.cliCtx); isCurrentValidator {
if err := cp.txBroadcaster.BroadcastToHeimdallWithDelay(msg, delay); err != nil {
cp.Logger.Error("Error while broadcasting checkpoint-sync to heimdall",
"root", rootChain, "number", nextCheckpointNumber, "start", start, "end", end, "error", err)
continue
}
cp.Logger.Info("checkpoint sync transaction sent successfully",
"root", rootChain, "number", nextCheckpointNumber, "start", start, "end", end)
}
cp.Logger.Info("checkpoint sync transaction sent successfully",
"root", rootChain, "number", nextCheckpointNumber, "start", start, "end", end)
}
}
}
Expand Down Expand Up @@ -183,13 +185,17 @@ func (cp *CheckpointProcessor) sendCheckpointSyncAckToHeimdall(eventName string,
)
// fetch checkpoint sync buffer
bufferedCheckpoint, err := util.GetBufferedCheckpointSync(cp.cliCtx, checkpointChain)
if err == nil {
bufferedTime := time.Unix(int64(bufferedCheckpoint.TimeStamp), 0)
currentTime := time.Now().UTC()
if currentTime.Sub(bufferedTime).Seconds() > checkpointParams.CheckpointBufferTime.Seconds()/5 {
cp.Logger.Debug("checkpoint sync buffer has expired, ignore this ack")
return nil
}

if err != nil {
cp.Logger.Debug("checkpoint sync buffer has been cleared, this ack already submitted.")
return nil
}

bufferedTime := time.Unix(int64(bufferedCheckpoint.TimeStamp), 0)
currentTime := time.Now().UTC()
if currentTime.Sub(bufferedTime).Seconds() > checkpointParams.CheckpointBufferTime.Seconds()/5 {
cp.Logger.Debug("checkpoint sync buffer has expired, ignore this ack")
return nil
}

// create msg checkpoint ack message
Expand Down
21 changes: 12 additions & 9 deletions bridge/setu/util/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,8 @@ const (
TaskDelayBetweenEachVal = 24 * time.Second
RetryTaskDelay = 12 * time.Second

BridgeDBFlag = "bridge-db"
BridgeDBFlag = "bridge-db"
ProposersURLSizeLimit = 100
)

var logger log.Logger
Expand Down Expand Up @@ -146,21 +147,23 @@ func CalculateTaskDelay(cliCtx cliContext.CLIContext) (bool, time.Duration) {
// calculate validator position
valPosition := 0
isCurrentValidator := false
response, err := helper.FetchFromAPI(cliCtx, helper.GetHeimdallServerEndpoint(CurrentValidatorSetURL))

proposersURL := fmt.Sprintf(ProposersURL, ProposersURLSizeLimit)
proposersResponse, err := helper.FetchFromAPI(cliCtx, helper.GetHeimdallServerEndpoint(proposersURL))
if err != nil {
logger.Error("Unable to send request for current validatorset", "url", CurrentValidatorSetURL, "error", err)
logger.Error("Unable to send request for proposers ", "url", proposersURL, "error", err)
return isCurrentValidator, 0
}
// unmarshall data from buffer
var validatorSet hmtypes.ValidatorSet
err = json.Unmarshal(response.Result, &validatorSet)

var proposers []hmtypes.Validator
err = json.Unmarshal(proposersResponse.Result, &proposers)
if err != nil {
logger.Error("Error unmarshalling current validatorset data ", "error", err)
logger.Error("Error unmarshalling proposers data ", "error", err)
return isCurrentValidator, 0
}

logger.Info("Fetched current validatorset list", "currentValidatorcount", len(validatorSet.Validators))
for i, validator := range validatorSet.Validators {
logger.Info("Fetched proposers ", "currentValidatorsCount", len(proposers))
for i, validator := range proposers {
if bytes.Equal(validator.Signer.Bytes(), helper.GetAddress()) {
valPosition = i + 1
isCurrentValidator = true
Expand Down