From 7d542373199fedac32b1166f23d53e39b60c6833 Mon Sep 17 00:00:00 2001 From: clarkchenc <651044554@qq.com> Date: Mon, 16 Jan 2023 18:14:51 +0800 Subject: [PATCH 1/3] chg: optimize ticker reset to prevent ticker shift --- bridge/setu/listener/base.go | 7 ++++++- bridge/setu/listener/heimdall.go | 6 +++++- bridge/setu/listener/tron.go | 6 +++++- 3 files changed, 16 insertions(+), 3 deletions(-) diff --git a/bridge/setu/listener/base.go b/bridge/setu/listener/base.go index 3dc142c8..e55bd932 100644 --- a/bridge/setu/listener/base.go +++ b/bridge/setu/listener/base.go @@ -3,6 +3,7 @@ package listener import ( "context" "math/big" + "sync" "time" "github.com/cosmos/cosmos-sdk/client" @@ -171,12 +172,16 @@ func (bl *BaseListener) StartPolling(ctx context.Context, pollInterval time.Dura // Setup the ticket and the channel to signal // the ending of the interval ticker := time.NewTicker(firstInterval) + var tickerOnce sync.Once // start listening for { select { case <-ticker.C: - ticker.Reset(interval) + tickerOnce.Do(func() { + ticker.Reset(interval) + }) + header, err := bl.chainClient.HeaderByNumber(ctx, nil) if err == nil && header != nil { // send data to channel diff --git a/bridge/setu/listener/heimdall.go b/bridge/setu/listener/heimdall.go index c405e06e..9b2e5371 100644 --- a/bridge/setu/listener/heimdall.go +++ b/bridge/setu/listener/heimdall.go @@ -4,6 +4,7 @@ import ( "context" "encoding/json" "strconv" + "sync" "time" stakingTypes "github.com/maticnetwork/heimdall/staking/types" @@ -72,6 +73,7 @@ func (hl *HeimdallListener) StartPolling(ctx context.Context, pollInterval time. // Setup the ticket and the channel to signal // the ending of the interval ticker := time.NewTicker(firstInterval) + var tickerOnce sync.Once // var eventTypes []string // eventTypes = append(eventTypes, "message.action='checkpoint'") @@ -83,7 +85,9 @@ func (hl *HeimdallListener) StartPolling(ctx context.Context, pollInterval time. for { select { case <-ticker.C: - ticker.Reset(interval) + tickerOnce.Do(func() { + ticker.Reset(interval) + }) fromBlock, toBlock, err := hl.fetchFromAndToBlock() if err != nil { hl.Logger.Error("Error fetching fromBlock and toBlock...skipping events query", "error", err) diff --git a/bridge/setu/listener/tron.go b/bridge/setu/listener/tron.go index c35b0da6..c90c3f27 100644 --- a/bridge/setu/listener/tron.go +++ b/bridge/setu/listener/tron.go @@ -6,6 +6,7 @@ import ( "encoding/json" "math/big" "strconv" + "sync" "time" "github.com/RichardKnop/machinery/v1/tasks" @@ -92,12 +93,15 @@ func (tl *TronListener) StartPolling(ctx context.Context, pollInterval time.Dura // Setup the ticket and the channel to signal // the ending of the interval ticker := time.NewTicker(firstInterval) + var tickerOnce sync.Once // start listening for { select { case <-ticker.C: - ticker.Reset(interval) + tickerOnce.Do(func() { + ticker.Reset(interval) + }) headerNum, err := tl.contractConnector.GetTronLatestBlockNumber() if err == nil { // send data to channel From 11250d577e33bf2c75815d37698129ade37ffe44 Mon Sep 17 00:00:00 2001 From: clarkchenc <651044554@qq.com> Date: Thu, 19 Jan 2023 16:17:38 +0800 Subject: [PATCH 2/3] chg: optimize proposer rotation when meeting no-ack --- bridge/setu/processor/checkpoint.go | 28 ++++++++++++++++++++-------- bridge/setu/util/common.go | 28 ++++++++++++++++++++++++++++ 2 files changed, 48 insertions(+), 8 deletions(-) diff --git a/bridge/setu/processor/checkpoint.go b/bridge/setu/processor/checkpoint.go index dbcd22d8..2ea7d09f 100644 --- a/bridge/setu/processor/checkpoint.go +++ b/bridge/setu/processor/checkpoint.go @@ -10,6 +10,7 @@ import ( "math" "math/big" "strconv" + "sync" "time" cliContext "github.com/cosmos/cosmos-sdk/client/context" @@ -94,18 +95,30 @@ func (cp *CheckpointProcessor) RegisterTasks() { } func (cp *CheckpointProcessor) startPolling(ctx context.Context) { - tickerForNoAck := time.NewTicker(helper.GetConfig().NoACKPollInterval) + now := time.Now() + baseTime := time.Unix(0, 0) + // no-ack ticker interval keep same with checkpoint interval + noAckInterval := helper.GetConfig().CheckpointerPollInterval + // adjust no-ack ticker to tick at the middle of checkpoint interval + firstIntervalForNoAck := noAckInterval - (now.UTC().Sub(baseTime) % noAckInterval) - noAckInterval/2 // nolint: gomnd + if firstIntervalForNoAck <= 0 { + firstIntervalForNoAck += noAckInterval + } + tickerForNoAck := time.NewTicker(firstIntervalForNoAck) syncInterval := helper.GetConfig().CheckpointerPollInterval / 2 tickerForSync := time.NewTicker(syncInterval) // stop ticker when everything done defer tickerForNoAck.Stop() defer tickerForSync.Stop() - cp.Logger.Info("Start polling", "no-ack-interval", helper.GetConfig().NoACKPollInterval, "checkpoint-sync-interval", syncInterval) + cp.Logger.Info("Start polling", "no-ack-interval", noAckInterval, "checkpoint-sync-interval", syncInterval) + + adjustOnce := sync.Once{} for { select { case <-tickerForNoAck.C: + adjustOnce.Do(func() { tickerForNoAck.Reset(noAckInterval) }) go cp.handleCheckpointNoAck() case <-tickerForSync.C: go cp.handleCheckpointSync() @@ -392,11 +405,11 @@ func (cp *CheckpointProcessor) handleCheckpointNoAck() { return } - isNoAckRequired, count := cp.checkIfNoAckIsRequired(checkpointContext, lastCreatedAt) + isNoAckRequired, _ := cp.checkIfNoAckIsRequired(checkpointContext, lastCreatedAt) if isNoAckRequired { var isProposer bool - if isProposer, err = util.IsProposerByIndex(cp.cliCtx, count); err != nil { + if isProposer, err = util.IsValidator(cp.cliCtx); err != nil { cp.Logger.Error("Error checking IsProposer while proposing Checkpoint No-Ack ", "error", err) return } @@ -797,10 +810,10 @@ func (cp *CheckpointProcessor) checkIfNoAckIsRequired(checkpointContext *Checkpo checkpointCreationTime := time.Unix(lastCreatedAt, 0) currentTime := time.Now().UTC() + timeDiff := currentTime.Sub(checkpointCreationTime) - // check if last checkpoint was < NoACK wait time - if timeDiff.Seconds() >= helper.GetConfig().NoACKWaitTime.Seconds() && index == 0 { - index = math.Floor(timeDiff.Seconds() / helper.GetConfig().NoACKWaitTime.Seconds()) + if timeDiff.Seconds() >= helper.GetConfig().CheckpointerPollInterval.Seconds() && index == 0 { + index = math.Floor(timeDiff.Seconds() / helper.GetConfig().CheckpointerPollInterval.Seconds()) } if index == 0 { @@ -812,7 +825,6 @@ func (cp *CheckpointProcessor) checkIfNoAckIsRequired(checkpointContext *Checkpo // check if difference between no-ack time and current time lastNoAck := cp.getLastNoAckTime() - lastNoAckTime := time.Unix(int64(lastNoAck), 0) // if last no ack == 0 , first no-ack to be sent if currentTime.Sub(lastNoAckTime).Seconds() < checkpointParams.CheckpointBufferTime.Seconds() && lastNoAck != 0 { diff --git a/bridge/setu/util/common.go b/bridge/setu/util/common.go index 96ded4a9..1451f9ae 100644 --- a/bridge/setu/util/common.go +++ b/bridge/setu/util/common.go @@ -120,6 +120,34 @@ func IsProposerByIndex(cliCtx cliContext.CLIContext, index uint64) (bool, error) return false, nil } +func IsValidator(cliCtx cliContext.CLIContext) (bool, error) { + var validatorSet hmtypes.ValidatorSet + + result, err := helper.FetchFromAPI(cliCtx, + helper.GetHeimdallServerEndpoint(CurrentValidatorSetURL), + ) + if err != nil { + logger.Error("Error fetching proposers", "url", CurrentValidatorSetURL, "error", err) + + return false, fmt.Errorf("failed to query heimdall server: %w", err) + } + + err = json.Unmarshal(result.Result, &validatorSet) + if err != nil { + logger.Error("error unmarshalling proposer slice", "error", err) + + return false, fmt.Errorf("failed to unmarshal validatorSet: %w", err) + } + + for _, validator := range validatorSet.Validators { + if bytes.Equal(validator.Signer.Bytes(), helper.GetAddress()) { + return true, nil + } + } + + return false, nil +} + // IsInProposerList checks if we are in current proposer func IsInProposerList(cliCtx cliContext.CLIContext, count uint64) (bool, error) { logger.Debug("Skipping proposers", "count", strconv.FormatUint(count, 10)) From af332bc9df12aa59f0bb14f94bc0a1bc24c61bb9 Mon Sep 17 00:00:00 2001 From: clarkchenc <651044554@qq.com> Date: Fri, 10 Feb 2023 18:34:20 +0800 Subject: [PATCH 3/3] fix: fix a lint problem --- bridge/setu/listener/base.go | 2 +- bridge/setu/listener/heimdall.go | 2 +- bridge/setu/listener/tron.go | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/bridge/setu/listener/base.go b/bridge/setu/listener/base.go index e55bd932..51724a6f 100644 --- a/bridge/setu/listener/base.go +++ b/bridge/setu/listener/base.go @@ -172,8 +172,8 @@ func (bl *BaseListener) StartPolling(ctx context.Context, pollInterval time.Dura // Setup the ticket and the channel to signal // the ending of the interval ticker := time.NewTicker(firstInterval) - var tickerOnce sync.Once + var tickerOnce sync.Once // start listening for { select { diff --git a/bridge/setu/listener/heimdall.go b/bridge/setu/listener/heimdall.go index 9b2e5371..2c031118 100644 --- a/bridge/setu/listener/heimdall.go +++ b/bridge/setu/listener/heimdall.go @@ -73,8 +73,8 @@ func (hl *HeimdallListener) StartPolling(ctx context.Context, pollInterval time. // Setup the ticket and the channel to signal // the ending of the interval ticker := time.NewTicker(firstInterval) - var tickerOnce sync.Once + var tickerOnce sync.Once // var eventTypes []string // eventTypes = append(eventTypes, "message.action='checkpoint'") // eventTypes = append(eventTypes, "message.action='event-record'") diff --git a/bridge/setu/listener/tron.go b/bridge/setu/listener/tron.go index c90c3f27..9eac7262 100644 --- a/bridge/setu/listener/tron.go +++ b/bridge/setu/listener/tron.go @@ -93,8 +93,8 @@ func (tl *TronListener) StartPolling(ctx context.Context, pollInterval time.Dura // Setup the ticket and the channel to signal // the ending of the interval ticker := time.NewTicker(firstInterval) - var tickerOnce sync.Once + var tickerOnce sync.Once // start listening for { select {