diff --git a/bridge/setu/broadcaster/broadcaster.go b/bridge/setu/broadcaster/broadcaster.go index 377e1312..d2c06d81 100644 --- a/bridge/setu/broadcaster/broadcaster.go +++ b/bridge/setu/broadcaster/broadcaster.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "sync" + "time" "github.com/cosmos/cosmos-sdk/client" cliContext "github.com/cosmos/cosmos-sdk/client/context" @@ -56,6 +57,16 @@ func NewTxBroadcaster(cdc *codec.Codec) *TxBroadcaster { return &txBroadcaster } +func (tb *TxBroadcaster) BroadcastToHeimdallWithDelay(msg sdk.Msg, delay time.Duration) error { + eta := time.Now().Add(delay) + timeAfterTrigger := time.After(delay) + tb.logger.Info("Tx sent on heimdall ", "msg", msg.Type(), "currentTime", time.Now(), "delayTime", eta) + select { + case <-timeAfterTrigger: + return tb.BroadcastToHeimdall(msg) + } +} + // BroadcastToHeimdall broadcast to heimdall func (tb *TxBroadcaster) BroadcastToHeimdall(msg sdk.Msg) error { tb.heimdallMutex.Lock() diff --git a/bridge/setu/processor/checkpointsync.go b/bridge/setu/processor/checkpointsync.go index 3ed207fe..01b82d36 100644 --- a/bridge/setu/processor/checkpointsync.go +++ b/bridge/setu/processor/checkpointsync.go @@ -61,13 +61,15 @@ func (cp *CheckpointProcessor) handleCheckpointSync() { msg := checkpointTypes.NewMsgCheckpointSync(hmTypes.BytesToHeimdallAddress(helper.GetAddress()), proposer, nextCheckpointNumber, start, end, rootChain) // return broadcast to heimdall - if err := cp.txBroadcaster.BroadcastToHeimdall(msg); err != nil { - cp.Logger.Error("Error while broadcasting checkpoint-sync to heimdall", - "root", rootChain, "number", nextCheckpointNumber, "start", start, "end", end, "error", err) - continue + if isCurrentValidator, delay := util.CalculateTaskDelay(cp.cliCtx); isCurrentValidator { + if err := cp.txBroadcaster.BroadcastToHeimdallWithDelay(msg, delay); err != nil { + cp.Logger.Error("Error while broadcasting checkpoint-sync to heimdall", + "root", rootChain, "number", nextCheckpointNumber, "start", start, "end", end, "error", err) + continue + } + cp.Logger.Info("checkpoint sync transaction sent successfully", + "root", rootChain, "number", nextCheckpointNumber, "start", start, "end", end) } - cp.Logger.Info("checkpoint sync transaction sent successfully", - "root", rootChain, "number", nextCheckpointNumber, "start", start, "end", end) } } } @@ -183,13 +185,17 @@ func (cp *CheckpointProcessor) sendCheckpointSyncAckToHeimdall(eventName string, ) // fetch checkpoint sync buffer bufferedCheckpoint, err := util.GetBufferedCheckpointSync(cp.cliCtx, checkpointChain) - if err == nil { - bufferedTime := time.Unix(int64(bufferedCheckpoint.TimeStamp), 0) - currentTime := time.Now().UTC() - if currentTime.Sub(bufferedTime).Seconds() > checkpointParams.CheckpointBufferTime.Seconds()/5 { - cp.Logger.Debug("checkpoint sync buffer has expired, ignore this ack") - return nil - } + + if err != nil { + cp.Logger.Debug("checkpoint sync buffer has been cleared, this ack already submitted.") + return nil + } + + bufferedTime := time.Unix(int64(bufferedCheckpoint.TimeStamp), 0) + currentTime := time.Now().UTC() + if currentTime.Sub(bufferedTime).Seconds() > checkpointParams.CheckpointBufferTime.Seconds()/5 { + cp.Logger.Debug("checkpoint sync buffer has expired, ignore this ack") + return nil } // create msg checkpoint ack message diff --git a/bridge/setu/util/common.go b/bridge/setu/util/common.go index 7376fc9c..d387cd7f 100644 --- a/bridge/setu/util/common.go +++ b/bridge/setu/util/common.go @@ -63,7 +63,8 @@ const ( TaskDelayBetweenEachVal = 24 * time.Second RetryTaskDelay = 12 * time.Second - BridgeDBFlag = "bridge-db" + BridgeDBFlag = "bridge-db" + ProposersURLSizeLimit = 100 ) var logger log.Logger @@ -146,21 +147,23 @@ func CalculateTaskDelay(cliCtx cliContext.CLIContext) (bool, time.Duration) { // calculate validator position valPosition := 0 isCurrentValidator := false - response, err := helper.FetchFromAPI(cliCtx, helper.GetHeimdallServerEndpoint(CurrentValidatorSetURL)) + + proposersURL := fmt.Sprintf(ProposersURL, ProposersURLSizeLimit) + proposersResponse, err := helper.FetchFromAPI(cliCtx, helper.GetHeimdallServerEndpoint(proposersURL)) if err != nil { - logger.Error("Unable to send request for current validatorset", "url", CurrentValidatorSetURL, "error", err) + logger.Error("Unable to send request for proposers ", "url", proposersURL, "error", err) return isCurrentValidator, 0 } - // unmarshall data from buffer - var validatorSet hmtypes.ValidatorSet - err = json.Unmarshal(response.Result, &validatorSet) + + var proposers []hmtypes.Validator + err = json.Unmarshal(proposersResponse.Result, &proposers) if err != nil { - logger.Error("Error unmarshalling current validatorset data ", "error", err) + logger.Error("Error unmarshalling proposers data ", "error", err) return isCurrentValidator, 0 } - logger.Info("Fetched current validatorset list", "currentValidatorcount", len(validatorSet.Validators)) - for i, validator := range validatorSet.Validators { + logger.Info("Fetched proposers ", "currentValidatorsCount", len(proposers)) + for i, validator := range proposers { if bytes.Equal(validator.Signer.Bytes(), helper.GetAddress()) { valPosition = i + 1 isCurrentValidator = true