From 11250d577e33bf2c75815d37698129ade37ffe44 Mon Sep 17 00:00:00 2001 From: clarkchenc <651044554@qq.com> Date: Thu, 19 Jan 2023 16:17:38 +0800 Subject: [PATCH] chg: optimize proposer rotation when meeting no-ack --- bridge/setu/processor/checkpoint.go | 28 ++++++++++++++++++++-------- bridge/setu/util/common.go | 28 ++++++++++++++++++++++++++++ 2 files changed, 48 insertions(+), 8 deletions(-) diff --git a/bridge/setu/processor/checkpoint.go b/bridge/setu/processor/checkpoint.go index dbcd22d8..2ea7d09f 100644 --- a/bridge/setu/processor/checkpoint.go +++ b/bridge/setu/processor/checkpoint.go @@ -10,6 +10,7 @@ import ( "math" "math/big" "strconv" + "sync" "time" cliContext "github.com/cosmos/cosmos-sdk/client/context" @@ -94,18 +95,30 @@ func (cp *CheckpointProcessor) RegisterTasks() { } func (cp *CheckpointProcessor) startPolling(ctx context.Context) { - tickerForNoAck := time.NewTicker(helper.GetConfig().NoACKPollInterval) + now := time.Now() + baseTime := time.Unix(0, 0) + // no-ack ticker interval keep same with checkpoint interval + noAckInterval := helper.GetConfig().CheckpointerPollInterval + // adjust no-ack ticker to tick at the middle of checkpoint interval + firstIntervalForNoAck := noAckInterval - (now.UTC().Sub(baseTime) % noAckInterval) - noAckInterval/2 // nolint: gomnd + if firstIntervalForNoAck <= 0 { + firstIntervalForNoAck += noAckInterval + } + tickerForNoAck := time.NewTicker(firstIntervalForNoAck) syncInterval := helper.GetConfig().CheckpointerPollInterval / 2 tickerForSync := time.NewTicker(syncInterval) // stop ticker when everything done defer tickerForNoAck.Stop() defer tickerForSync.Stop() - cp.Logger.Info("Start polling", "no-ack-interval", helper.GetConfig().NoACKPollInterval, "checkpoint-sync-interval", syncInterval) + cp.Logger.Info("Start polling", "no-ack-interval", noAckInterval, "checkpoint-sync-interval", syncInterval) + + adjustOnce := sync.Once{} for { select { case <-tickerForNoAck.C: + adjustOnce.Do(func() { tickerForNoAck.Reset(noAckInterval) }) go cp.handleCheckpointNoAck() case <-tickerForSync.C: go cp.handleCheckpointSync() @@ -392,11 +405,11 @@ func (cp *CheckpointProcessor) handleCheckpointNoAck() { return } - isNoAckRequired, count := cp.checkIfNoAckIsRequired(checkpointContext, lastCreatedAt) + isNoAckRequired, _ := cp.checkIfNoAckIsRequired(checkpointContext, lastCreatedAt) if isNoAckRequired { var isProposer bool - if isProposer, err = util.IsProposerByIndex(cp.cliCtx, count); err != nil { + if isProposer, err = util.IsValidator(cp.cliCtx); err != nil { cp.Logger.Error("Error checking IsProposer while proposing Checkpoint No-Ack ", "error", err) return } @@ -797,10 +810,10 @@ func (cp *CheckpointProcessor) checkIfNoAckIsRequired(checkpointContext *Checkpo checkpointCreationTime := time.Unix(lastCreatedAt, 0) currentTime := time.Now().UTC() + timeDiff := currentTime.Sub(checkpointCreationTime) - // check if last checkpoint was < NoACK wait time - if timeDiff.Seconds() >= helper.GetConfig().NoACKWaitTime.Seconds() && index == 0 { - index = math.Floor(timeDiff.Seconds() / helper.GetConfig().NoACKWaitTime.Seconds()) + if timeDiff.Seconds() >= helper.GetConfig().CheckpointerPollInterval.Seconds() && index == 0 { + index = math.Floor(timeDiff.Seconds() / helper.GetConfig().CheckpointerPollInterval.Seconds()) } if index == 0 { @@ -812,7 +825,6 @@ func (cp *CheckpointProcessor) checkIfNoAckIsRequired(checkpointContext *Checkpo // check if difference between no-ack time and current time lastNoAck := cp.getLastNoAckTime() - lastNoAckTime := time.Unix(int64(lastNoAck), 0) // if last no ack == 0 , first no-ack to be sent if currentTime.Sub(lastNoAckTime).Seconds() < checkpointParams.CheckpointBufferTime.Seconds() && lastNoAck != 0 { diff --git a/bridge/setu/util/common.go b/bridge/setu/util/common.go index 96ded4a9..1451f9ae 100644 --- a/bridge/setu/util/common.go +++ b/bridge/setu/util/common.go @@ -120,6 +120,34 @@ func IsProposerByIndex(cliCtx cliContext.CLIContext, index uint64) (bool, error) return false, nil } +func IsValidator(cliCtx cliContext.CLIContext) (bool, error) { + var validatorSet hmtypes.ValidatorSet + + result, err := helper.FetchFromAPI(cliCtx, + helper.GetHeimdallServerEndpoint(CurrentValidatorSetURL), + ) + if err != nil { + logger.Error("Error fetching proposers", "url", CurrentValidatorSetURL, "error", err) + + return false, fmt.Errorf("failed to query heimdall server: %w", err) + } + + err = json.Unmarshal(result.Result, &validatorSet) + if err != nil { + logger.Error("error unmarshalling proposer slice", "error", err) + + return false, fmt.Errorf("failed to unmarshal validatorSet: %w", err) + } + + for _, validator := range validatorSet.Validators { + if bytes.Equal(validator.Signer.Bytes(), helper.GetAddress()) { + return true, nil + } + } + + return false, nil +} + // IsInProposerList checks if we are in current proposer func IsInProposerList(cliCtx cliContext.CLIContext, count uint64) (bool, error) { logger.Debug("Skipping proposers", "count", strconv.FormatUint(count, 10))