From 7d03bfc52ad6e5bca8fed7e39d06c19ecc80ced1 Mon Sep 17 00:00:00 2001 From: halibobo1205 Date: Mon, 10 Jan 2022 17:55:43 +0800 Subject: [PATCH 1/2] fix: msg broadcast delay && checkpointSyncACk msg broadcast --- bridge/setu/broadcaster/broadcaster.go | 11 ++++++++ bridge/setu/processor/checkpointsync.go | 32 +++++++++++++--------- bridge/setu/util/common.go | 35 ++++++++++++++++--------- 3 files changed, 52 insertions(+), 26 deletions(-) diff --git a/bridge/setu/broadcaster/broadcaster.go b/bridge/setu/broadcaster/broadcaster.go index 377e1312..d2c06d81 100644 --- a/bridge/setu/broadcaster/broadcaster.go +++ b/bridge/setu/broadcaster/broadcaster.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "sync" + "time" "github.com/cosmos/cosmos-sdk/client" cliContext "github.com/cosmos/cosmos-sdk/client/context" @@ -56,6 +57,16 @@ func NewTxBroadcaster(cdc *codec.Codec) *TxBroadcaster { return &txBroadcaster } +func (tb *TxBroadcaster) BroadcastToHeimdallWithDelay(msg sdk.Msg, delay time.Duration) error { + eta := time.Now().Add(delay) + timeAfterTrigger := time.After(delay) + tb.logger.Info("Tx sent on heimdall ", "msg", msg.Type(), "currentTime", time.Now(), "delayTime", eta) + select { + case <-timeAfterTrigger: + return tb.BroadcastToHeimdall(msg) + } +} + // BroadcastToHeimdall broadcast to heimdall func (tb *TxBroadcaster) BroadcastToHeimdall(msg sdk.Msg) error { tb.heimdallMutex.Lock() diff --git a/bridge/setu/processor/checkpointsync.go b/bridge/setu/processor/checkpointsync.go index 3ed207fe..01b82d36 100644 --- a/bridge/setu/processor/checkpointsync.go +++ b/bridge/setu/processor/checkpointsync.go @@ -61,13 +61,15 @@ func (cp *CheckpointProcessor) handleCheckpointSync() { msg := checkpointTypes.NewMsgCheckpointSync(hmTypes.BytesToHeimdallAddress(helper.GetAddress()), proposer, nextCheckpointNumber, start, end, rootChain) // return broadcast to heimdall - if err := cp.txBroadcaster.BroadcastToHeimdall(msg); err != nil { - cp.Logger.Error("Error while broadcasting checkpoint-sync to heimdall", - "root", rootChain, "number", nextCheckpointNumber, "start", start, "end", end, "error", err) - continue + if isCurrentValidator, delay := util.CalculateTaskDelay(cp.cliCtx); isCurrentValidator { + if err := cp.txBroadcaster.BroadcastToHeimdallWithDelay(msg, delay); err != nil { + cp.Logger.Error("Error while broadcasting checkpoint-sync to heimdall", + "root", rootChain, "number", nextCheckpointNumber, "start", start, "end", end, "error", err) + continue + } + cp.Logger.Info("checkpoint sync transaction sent successfully", + "root", rootChain, "number", nextCheckpointNumber, "start", start, "end", end) } - cp.Logger.Info("checkpoint sync transaction sent successfully", - "root", rootChain, "number", nextCheckpointNumber, "start", start, "end", end) } } } @@ -183,13 +185,17 @@ func (cp *CheckpointProcessor) sendCheckpointSyncAckToHeimdall(eventName string, ) // fetch checkpoint sync buffer bufferedCheckpoint, err := util.GetBufferedCheckpointSync(cp.cliCtx, checkpointChain) - if err == nil { - bufferedTime := time.Unix(int64(bufferedCheckpoint.TimeStamp), 0) - currentTime := time.Now().UTC() - if currentTime.Sub(bufferedTime).Seconds() > checkpointParams.CheckpointBufferTime.Seconds()/5 { - cp.Logger.Debug("checkpoint sync buffer has expired, ignore this ack") - return nil - } + + if err != nil { + cp.Logger.Debug("checkpoint sync buffer has been cleared, this ack already submitted.") + return nil + } + + bufferedTime := time.Unix(int64(bufferedCheckpoint.TimeStamp), 0) + currentTime := time.Now().UTC() + if currentTime.Sub(bufferedTime).Seconds() > checkpointParams.CheckpointBufferTime.Seconds()/5 { + cp.Logger.Debug("checkpoint sync buffer has expired, ignore this ack") + return nil } // create msg checkpoint ack message diff --git a/bridge/setu/util/common.go b/bridge/setu/util/common.go index 7376fc9c..c5567838 100644 --- a/bridge/setu/util/common.go +++ b/bridge/setu/util/common.go @@ -143,34 +143,43 @@ func IsInProposerList(cliCtx cliContext.CLIContext, count uint64) (bool, error) // CalculateTaskDelay calculates delay required for current validator to propose the tx // It solves for multiple validators sending same transaction. func CalculateTaskDelay(cliCtx cliContext.CLIContext) (bool, time.Duration) { - // calculate validator position - valPosition := 0 - isCurrentValidator := false response, err := helper.FetchFromAPI(cliCtx, helper.GetHeimdallServerEndpoint(CurrentValidatorSetURL)) if err != nil { logger.Error("Unable to send request for current validatorset", "url", CurrentValidatorSetURL, "error", err) - return isCurrentValidator, 0 + return false, 0 } // unmarshall data from buffer var validatorSet hmtypes.ValidatorSet err = json.Unmarshal(response.Result, &validatorSet) if err != nil { logger.Error("Error unmarshalling current validatorset data ", "error", err) - return isCurrentValidator, 0 + return false, 0 } logger.Info("Fetched current validatorset list", "currentValidatorcount", len(validatorSet.Validators)) - for i, validator := range validatorSet.Validators { - if bytes.Equal(validator.Signer.Bytes(), helper.GetAddress()) { - valPosition = i + 1 - isCurrentValidator = true - break - } + + validators := validatorSet.Validators + proposer := validatorSet.GetProposer().Signer.Bytes() + localAddress := helper.GetAddress() + + proposerIndex, _ := validatorSet.GetByAddress(proposer) + localIndex, _ := validatorSet.GetByAddress(localAddress) + + if localIndex < 0 { + return false, 0 } + // temp index + tempIndex := localIndex + if tempIndex < proposerIndex { + tempIndex = tempIndex + len(validators) + } + + delay := tempIndex - proposerIndex + 1 + // calculate delay - taskDelay := time.Duration(valPosition) * TaskDelayBetweenEachVal - return isCurrentValidator, taskDelay + taskDelay := time.Duration(delay) * TaskDelayBetweenEachVal + return true, taskDelay } // IsCurrentProposer checks if we are current proposer From 23b64f5134fdf96732c4796f8905dd1ed9d946bf Mon Sep 17 00:00:00 2001 From: halibobo1205 Date: Mon, 10 Jan 2022 18:40:15 +0800 Subject: [PATCH 2/2] fix: change CalculateTaskDelay URL --- bridge/setu/util/common.go | 54 +++++++++++++++++--------------------- 1 file changed, 24 insertions(+), 30 deletions(-) diff --git a/bridge/setu/util/common.go b/bridge/setu/util/common.go index c5567838..d387cd7f 100644 --- a/bridge/setu/util/common.go +++ b/bridge/setu/util/common.go @@ -63,7 +63,8 @@ const ( TaskDelayBetweenEachVal = 24 * time.Second RetryTaskDelay = 12 * time.Second - BridgeDBFlag = "bridge-db" + BridgeDBFlag = "bridge-db" + ProposersURLSizeLimit = 100 ) var logger log.Logger @@ -143,43 +144,36 @@ func IsInProposerList(cliCtx cliContext.CLIContext, count uint64) (bool, error) // CalculateTaskDelay calculates delay required for current validator to propose the tx // It solves for multiple validators sending same transaction. func CalculateTaskDelay(cliCtx cliContext.CLIContext) (bool, time.Duration) { - response, err := helper.FetchFromAPI(cliCtx, helper.GetHeimdallServerEndpoint(CurrentValidatorSetURL)) - if err != nil { - logger.Error("Unable to send request for current validatorset", "url", CurrentValidatorSetURL, "error", err) - return false, 0 - } - // unmarshall data from buffer - var validatorSet hmtypes.ValidatorSet - err = json.Unmarshal(response.Result, &validatorSet) + // calculate validator position + valPosition := 0 + isCurrentValidator := false + + proposersURL := fmt.Sprintf(ProposersURL, ProposersURLSizeLimit) + proposersResponse, err := helper.FetchFromAPI(cliCtx, helper.GetHeimdallServerEndpoint(proposersURL)) if err != nil { - logger.Error("Error unmarshalling current validatorset data ", "error", err) - return false, 0 + logger.Error("Unable to send request for proposers ", "url", proposersURL, "error", err) + return isCurrentValidator, 0 } - logger.Info("Fetched current validatorset list", "currentValidatorcount", len(validatorSet.Validators)) - - validators := validatorSet.Validators - proposer := validatorSet.GetProposer().Signer.Bytes() - localAddress := helper.GetAddress() - - proposerIndex, _ := validatorSet.GetByAddress(proposer) - localIndex, _ := validatorSet.GetByAddress(localAddress) - - if localIndex < 0 { - return false, 0 + var proposers []hmtypes.Validator + err = json.Unmarshal(proposersResponse.Result, &proposers) + if err != nil { + logger.Error("Error unmarshalling proposers data ", "error", err) + return isCurrentValidator, 0 } - // temp index - tempIndex := localIndex - if tempIndex < proposerIndex { - tempIndex = tempIndex + len(validators) + logger.Info("Fetched proposers ", "currentValidatorsCount", len(proposers)) + for i, validator := range proposers { + if bytes.Equal(validator.Signer.Bytes(), helper.GetAddress()) { + valPosition = i + 1 + isCurrentValidator = true + break + } } - delay := tempIndex - proposerIndex + 1 - // calculate delay - taskDelay := time.Duration(delay) * TaskDelayBetweenEachVal - return true, taskDelay + taskDelay := time.Duration(valPosition) * TaskDelayBetweenEachVal + return isCurrentValidator, taskDelay } // IsCurrentProposer checks if we are current proposer