Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 41 additions & 23 deletions bridge/setu/processor/checkpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,7 @@ func (cp *CheckpointProcessor) Start() error {
// no-ack
ackCtx, cancelNoACKPolling := context.WithCancel(context.Background())
cp.cancelNoACKPolling = cancelNoACKPolling
cp.Logger.Info("Start polling for no-ack", "pollInterval", helper.GetConfig().NoACKPollInterval)
go cp.startPollingForNoAck(ackCtx, helper.GetConfig().NoACKPollInterval)
go cp.startPolling(ackCtx)
return nil
}

Expand All @@ -94,21 +93,24 @@ func (cp *CheckpointProcessor) RegisterTasks() {
}
}

func (cp *CheckpointProcessor) startPollingForNoAck(ctx context.Context, interval time.Duration) {
ticker := time.NewTicker(interval)
ticker1 := time.NewTicker(helper.GetConfig().CheckpointerPollInterval)
func (cp *CheckpointProcessor) startPolling(ctx context.Context) {
tickerForNoAck := time.NewTicker(helper.GetConfig().NoACKPollInterval)

syncInterval := helper.GetConfig().CheckpointerPollInterval / 2
tickerForSync := time.NewTicker(syncInterval)
// stop ticker when everything done
defer ticker.Stop()
defer ticker1.Stop()
defer tickerForNoAck.Stop()
defer tickerForSync.Stop()

cp.Logger.Info("Start polling", "no-ack-interval", helper.GetConfig().NoACKPollInterval, "checkpoint-sync-interval", syncInterval)
for {
select {
case <-ticker.C:
case <-tickerForNoAck.C:
go cp.handleCheckpointNoAck()
case <-ticker1.C:
case <-tickerForSync.C:
go cp.handleCheckpointSync()
case <-ctx.Done():
cp.Logger.Info("No-ack Polling stopped")
ticker.Stop()
cp.Logger.Info("Polling stopped")
return
}
}
Expand Down Expand Up @@ -402,8 +404,7 @@ func (cp *CheckpointProcessor) handleCheckpointNoAck() {
}
}

// nextExpectedCheckpoint - fetched contract checkpoint state and returns the next probable checkpoint that needs to be sent
func (cp *CheckpointProcessor) nextExpectedCheckpoint(checkpointContext *CheckpointContext, latestChildBlock uint64, rootChain string) (*ContractCheckpoint, error) {
func (cp *CheckpointProcessor) getCurrentCheckpoint(checkpointContext *CheckpointContext, rootChain string) (*CheckpointInfo, error) {
chainmanagerParams := checkpointContext.ChainmanagerParams
checkpointParams := checkpointContext.CheckpointParams

Expand All @@ -413,25 +414,42 @@ func (cp *CheckpointProcessor) nextExpectedCheckpoint(checkpointContext *Checkpo
}

// fetch current header block from mainchain contract
_currentHeaderBlock, err := cp.contractConnector.CurrentHeaderBlock(rootChainInstance, checkpointParams.ChildBlockInterval)
currentHeaderNumber, err := cp.contractConnector.CurrentHeaderBlock(rootChainInstance, checkpointParams.ChildBlockInterval)
if err != nil {
cp.Logger.Error("Error while fetching current header block number from rootchain", "root", rootChain, "error", err)
return nil, err
}

// current header block
currentHeaderBlockNumber := big.NewInt(0).SetUint64(_currentHeaderBlock)
// get header info
rootHash, start, end, createAt, proposer, err := cp.contractConnector.GetHeaderInfo(currentHeaderNumber, rootChainInstance, checkpointParams.ChildBlockInterval)
if err != nil {
cp.Logger.Error("Error while fetching current header block object from rootchain", "root", rootChain, "error", err)
return nil, err
}

return &CheckpointInfo{
rootHash: rootHash,
number: currentHeaderNumber,
start: start,
end: end,
createTime: createAt,
proposer: proposer,
}, nil
}

// nextExpectedCheckpoint - fetched contract checkpoint state and returns the next probable checkpoint that needs to be sent
func (cp *CheckpointProcessor) nextExpectedCheckpoint(checkpointContext *CheckpointContext, latestChildBlock uint64, rootChain string) (*ContractCheckpoint, error) {
checkpointParams := checkpointContext.CheckpointParams
// get header info
_, currentStart, currentEnd, lastCheckpointTime, _, err := cp.contractConnector.GetHeaderInfo(currentHeaderBlockNumber.Uint64(), rootChainInstance, checkpointParams.ChildBlockInterval)
currentCheckpointInfo, err := cp.getCurrentCheckpoint(checkpointContext, rootChain)
if err != nil {
cp.Logger.Error("Error while fetching current header block object from rootchain", "root", rootChain, "error", err)
return nil, err
}

// find next start/end
var start, end uint64
start = currentEnd
start = currentCheckpointInfo.end

// add 1 if start > 0
if start > 0 {
Expand Down Expand Up @@ -466,11 +484,11 @@ func (cp *CheckpointProcessor) nextExpectedCheckpoint(checkpointContext *Checkpo

currentTime := time.Now().UTC().Unix()
defaultForcePushInterval := checkpointParams.MaxCheckpointLength * 2 // in seconds (1024 * 2 seconds)
if currentTime-int64(lastCheckpointTime) > int64(defaultForcePushInterval) {
if currentTime-int64(currentCheckpointInfo.createTime) > int64(defaultForcePushInterval) {
end = latestChildBlock
cp.Logger.Info("Force push checkpoint",
"currentTime", currentTime,
"lastCheckpointTime", lastCheckpointTime,
"lastCheckpointTime", currentCheckpointInfo.createTime,
"defaultForcePushInterval", defaultForcePushInterval,
"start", start,
"end", end,
Expand All @@ -483,9 +501,9 @@ func (cp *CheckpointProcessor) nextExpectedCheckpoint(checkpointContext *Checkpo
// return nil, errors.New("Invalid start end formation")
// }
return NewContractCheckpoint(start, end, &HeaderBlock{
start: currentStart,
end: currentEnd,
number: currentHeaderBlockNumber,
start: currentCheckpointInfo.start,
end: currentCheckpointInfo.end,
number: big.NewInt(0).SetUint64(currentCheckpointInfo.number),
}), nil
}

Expand Down
18 changes: 13 additions & 5 deletions bridge/setu/processor/checkpointsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,27 +48,35 @@ func (cp *CheckpointProcessor) handleCheckpointSync() {
cp.Logger.Error("Error fetching syncedHeaderNumber from stakeChain", "root", rootChain, "error", err)
continue
}
nextCheckpointNumber := lastSyncedCheckpointNumber + 1
nextSyncCheckpointNumber := lastSyncedCheckpointNumber + 1
// fetch nextHeaderBlock from rootChain
start, end, proposer, err := cp.getHeaderBlock(checkpointContext, nextCheckpointNumber, rootChain)
start, end, proposer, err := cp.getHeaderBlock(checkpointContext, nextSyncCheckpointNumber, rootChain)
if err != nil {
cp.Logger.Error("Error fetching currentHeaderBlock from rootChain", "root", rootChain, "error", err)
continue
}

latestCheckpointInfo, err := cp.getCurrentCheckpoint(checkpointContext, rootChain)
if err != nil {
cp.Logger.Error("Error get latest checkpoint", "root", rootChain)
continue
}

cp.Logger.Info("checkpoint sync is ready to send", "root", rootChain, "checkpointSyncNumber", nextSyncCheckpointNumber, "start", start, "end", end,
"latestCheckpointNumber", latestCheckpointInfo.number)
if end != 0 {
// send checkpoint sync
msg := checkpointTypes.NewMsgCheckpointSync(hmTypes.BytesToHeimdallAddress(helper.GetAddress()),
proposer, nextCheckpointNumber, start, end, rootChain)
proposer, nextSyncCheckpointNumber, start, end, rootChain)
// return broadcast to heimdall
if isCurrentValidator, delay := util.CalculateTaskDelay(cp.cliCtx); isCurrentValidator {
if err := cp.txBroadcaster.BroadcastToHeimdallWithDelay(msg, delay); err != nil {
cp.Logger.Error("Error while broadcasting checkpoint-sync to heimdall",
"root", rootChain, "number", nextCheckpointNumber, "start", start, "end", end, "error", err)
"root", rootChain, "number", nextSyncCheckpointNumber, "start", start, "end", end, "error", err)
continue
}
cp.Logger.Info("checkpoint sync transaction sent successfully",
"root", rootChain, "number", nextCheckpointNumber, "start", start, "end", end)
"root", rootChain, "number", nextSyncCheckpointNumber, "start", start, "end", end)
}
}
}
Expand Down
12 changes: 12 additions & 0 deletions bridge/setu/processor/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@ package processor
import (
"fmt"
"math/big"

"github.com/maticnetwork/bor/common"
"github.com/maticnetwork/heimdall/types"
)

// HeaderBlock header block
Expand All @@ -12,6 +15,15 @@ type HeaderBlock struct {
number *big.Int
}

type CheckpointInfo struct {
rootHash common.Hash
number uint64
start uint64
end uint64
createTime uint64
proposer types.HeimdallAddress
}

// ContractCheckpoint contract checkpoint
type ContractCheckpoint struct {
newStart uint64
Expand Down