Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 5 additions & 7 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
"github.com/dappnode/validator-tracker/internal/adapters/dappmanager"
"github.com/dappnode/validator-tracker/internal/adapters/notifier"
"github.com/dappnode/validator-tracker/internal/adapters/web3signer"
"github.com/dappnode/validator-tracker/internal/application/domain"
"github.com/dappnode/validator-tracker/internal/application/services"
"github.com/dappnode/validator-tracker/internal/config"
"github.com/dappnode/validator-tracker/internal/logger"
Expand Down Expand Up @@ -47,12 +46,11 @@ func main() {

// Start the duties checker service in a goroutine
dutiesChecker := &services.DutiesChecker{
Beacon: beacon,
Signer: web3Signer,
Notifier: notifier,
Dappmanager: dappmanager,
PollInterval: 1 * time.Minute,
CheckedEpochs: make(map[domain.ValidatorIndex]domain.Epoch),
Beacon: beacon,
Signer: web3Signer,
Notifier: notifier,
Dappmanager: dappmanager,
PollInterval: 1 * time.Minute,
}
wg.Add(1)
go func() {
Expand Down
84 changes: 29 additions & 55 deletions internal/application/services/dutieschecker_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,46 +17,41 @@ type DutiesChecker struct {

PollInterval time.Duration
lastJustifiedEpoch domain.Epoch
CheckedEpochs map[domain.ValidatorIndex]domain.Epoch // latest epoch checked for each validator index

// lastLivenessState tracks the last liveness notification sent: nil = no notification sent yet (first run),
// true = last notification was online, false = last notification was offline
lastLivenessState *bool
lastLivenessState *bool
lastRunHadError bool
}

// If at interval, ticker ticks but check has not ended, we wont start a new check, we will just wait for the next tick.
func (a *DutiesChecker) Run(ctx context.Context) {
ticker := time.NewTicker(a.PollInterval)
defer ticker.Stop()

for {
select {
case <-ticker.C:
a.checkLatestJustifiedEpoch(ctx)
justifiedEpoch, err := a.Beacon.GetJustifiedEpoch(ctx)
if err != nil {
logger.Error("Error fetching justified epoch: %v", err)
a.lastRunHadError = true
continue
}

if justifiedEpoch == a.lastJustifiedEpoch && !a.lastRunHadError {
logger.Debug("Justified epoch %d unchanged and last run was successful, skipping check.", justifiedEpoch)
continue
}

a.lastJustifiedEpoch = justifiedEpoch
a.lastRunHadError = a.performChecks(ctx, justifiedEpoch) != nil

case <-ctx.Done():
return
}
}
}

func (a *DutiesChecker) checkLatestJustifiedEpoch(ctx context.Context) {
// TODO: we want to keep checking on error until?
// 1. info required to determine if notification was required to be sent
// 2. if notification x was suppose to be sent and could not

justifiedEpoch, err := a.Beacon.GetJustifiedEpoch(ctx)
if err != nil {
logger.Error("Error fetching justified epoch: %v", err)
return
}
if justifiedEpoch == a.lastJustifiedEpoch {
logger.Debug("Justified epoch %d unchanged, skipping check.", justifiedEpoch)
return
}
a.lastJustifiedEpoch = justifiedEpoch
func (a *DutiesChecker) performChecks(ctx context.Context, justifiedEpoch domain.Epoch) error {
logger.Info("New justified epoch %d detected.", justifiedEpoch)

// Check for notifications
notificationsEnabled, err := a.Dappmanager.GetNotificationsEnabled(ctx)
if err != nil {
logger.Warn("Error fetching notifications enabled, notification will not be sent: %v", err)
Expand All @@ -65,27 +60,25 @@ func (a *DutiesChecker) checkLatestJustifiedEpoch(ctx context.Context) {
pubkeys, err := a.Signer.GetValidatorPubkeys()
if err != nil {
logger.Error("Error fetching pubkeys from web3signer: %v", err)
return
return err
}

indices, err := a.Beacon.GetValidatorIndicesByPubkeys(ctx, pubkeys)
if err != nil {
logger.Error("Error fetching validator indices from beacon node: %v", err)
return
return err
}
logger.Info("Found %d validator indices active", len(indices))

validatorIndices := a.getValidatorsToCheck(indices, justifiedEpoch)
if len(validatorIndices) == 0 {
logger.Debug("No validators left to check for epoch %d", justifiedEpoch)
return
if len(indices) == 0 {
logger.Debug("No validators found to check for epoch %d", justifiedEpoch)
return nil
}

// Liveness notification logic
offline, _, allLive, err := a.checkLiveness(ctx, justifiedEpoch, validatorIndices)
offline, _, allLive, err := a.checkLiveness(ctx, justifiedEpoch, indices)
if err != nil {
logger.Error("Error checking liveness for validators: %v", err)
return
return err
}
if len(offline) > 0 && (a.lastLivenessState == nil || *a.lastLivenessState) {
if notificationsEnabled[domain.ValidatorLiveness] {
Expand All @@ -98,19 +91,18 @@ func (a *DutiesChecker) checkLatestJustifiedEpoch(ctx context.Context) {
}
if allLive && (a.lastLivenessState == nil || !*a.lastLivenessState) {
if notificationsEnabled[domain.ValidatorLiveness] {
if err := a.Notifier.SendValidatorLivenessNot(validatorIndices, true); err != nil {
if err := a.Notifier.SendValidatorLivenessNot(indices, true); err != nil {
logger.Warn("Error sending validator liveness notification: %v", err)
}
}
val := true
a.lastLivenessState = &val
}

// Block proposal notification logic
proposed, missed, err := a.checkProposals(ctx, justifiedEpoch, validatorIndices)
proposed, missed, err := a.checkProposals(ctx, justifiedEpoch, indices)
if err != nil {
logger.Error("Error checking block proposals: %v", err)
return
return err
}
if len(proposed) > 0 && notificationsEnabled[domain.BlockProposal] {
if err := a.Notifier.SendBlockProposalNot(proposed, int(justifiedEpoch), true); err != nil {
Expand All @@ -123,10 +115,7 @@ func (a *DutiesChecker) checkLatestJustifiedEpoch(ctx context.Context) {
}
}

// Mark validators as checked
for _, index := range validatorIndices {
a.CheckedEpochs[index] = justifiedEpoch
}
return nil
}

func (a *DutiesChecker) checkLiveness(
Expand Down Expand Up @@ -188,18 +177,3 @@ func (a *DutiesChecker) checkProposals(
}
return proposed, missed, nil
}

func (a *DutiesChecker) getValidatorsToCheck(indices []domain.ValidatorIndex, epoch domain.Epoch) []domain.ValidatorIndex {
var result []domain.ValidatorIndex
for _, index := range indices {
if a.wasCheckedThisEpoch(index, epoch) {
continue
}
result = append(result, index)
}
return result
}

func (a *DutiesChecker) wasCheckedThisEpoch(index domain.ValidatorIndex, epoch domain.Epoch) bool {
return a.CheckedEpochs[index] == epoch
}