Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,9 @@ func main() {
defer cancel()
var wg sync.WaitGroup

// Start the attestation checker service in a goroutine
logger.Info("Starting attestation checker for %d validators", len(indices))
checker := &services.AttestationChecker{
// Start the duties checker service in a goroutine
logger.Info("Starting duties checker for %d validators", len(indices))
dutiesChecker := &services.DutiesChecker{
BeaconAdapter: adapter,
Web3SignerAdapter: web3Signer,
PollInterval: 1 * time.Minute,
Expand All @@ -58,7 +58,7 @@ func main() {
wg.Add(1)
go func() {
defer wg.Done()
checker.Run(ctx)
dutiesChecker.Run(ctx)
}()

// Handle graceful shutdown
Expand Down
41 changes: 41 additions & 0 deletions internal/adapters/attestantclient_adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,3 +185,44 @@ func (b *beaconAttestantClient) GetValidatorIndicesByPubkeys(ctx context.Context
}
return indices, nil
}

// GetProposerDuties retrieves proposer duties for the given epoch and validator indices.
func (b *beaconAttestantClient) GetProposerDuties(ctx context.Context, epoch domain.Epoch, indices []domain.ValidatorIndex) ([]domain.ProposerDuty, error) {
var beaconIndices []phase0.ValidatorIndex
for _, idx := range indices {
beaconIndices = append(beaconIndices, phase0.ValidatorIndex(idx))
}

resp, err := b.client.ProposerDuties(ctx, &api.ProposerDutiesOpts{
Epoch: phase0.Epoch(epoch),
Indices: beaconIndices,
})
if err != nil {
return nil, err
}

var duties []domain.ProposerDuty
for _, d := range resp.Data {
duties = append(duties, domain.ProposerDuty{
Slot: domain.Slot(d.Slot),
ValidatorIndex: domain.ValidatorIndex(d.ValidatorIndex),
})
}
return duties, nil
}

// DidProposeBlock checks a given slot includes a block proposed
func (b *beaconAttestantClient) DidProposeBlock(ctx context.Context, slot domain.Slot) (bool, error) {
block, err := b.client.SignedBeaconBlock(ctx, &api.SignedBeaconBlockOpts{
Block: fmt.Sprintf("%d", slot),
})
if err != nil {
// TODO: are we sure we can assume that a 404 means the block was not proposed?
// What error code is returned in all consensus if the block is not in their state?
if apiErr, ok := err.(*api.Error); ok && apiErr.StatusCode == 404 {
return false, nil // Block was not proposed
}
return false, err // Real error
}
return block != nil && block.Data != nil, nil
}
20 changes: 17 additions & 3 deletions internal/application/domain/validator.go
Original file line number Diff line number Diff line change
@@ -1,21 +1,35 @@
package domain

// --------------------------------------------------------

// Domain types used for anything related to validators
type Epoch uint64
type Slot uint64
type ValidatorIndex uint64
type CommitteeIndex uint64

// --------------------------------------------------------

// Attestation-related types
type ValidatorDuty struct {
Slot Slot
CommitteeIndex CommitteeIndex
ValidatorCommitteeIdx uint64
ValidatorIndex ValidatorIndex
}

type CommitteeSizeMap map[CommitteeIndex]int

type Attestation struct {
DataSlot Slot
CommitteeBits []byte
AggregationBits []byte
}

type CommitteeSizeMap map[CommitteeIndex]int
type CommitteeIndex uint64

// --------------------------------------------------------

// Proposer-related types
type ProposerDuty struct {
Slot Slot
ValidatorIndex ValidatorIndex
}
3 changes: 3 additions & 0 deletions internal/application/ports/beaconchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,7 @@ type BeaconChainAdapter interface {
GetCommitteeSizeMap(ctx context.Context, slot domain.Slot) (domain.CommitteeSizeMap, error)
GetBlockAttestations(ctx context.Context, slot domain.Slot) ([]domain.Attestation, error)
GetValidatorIndicesByPubkeys(ctx context.Context, pubkeys []string) ([]domain.ValidatorIndex, error)

GetProposerDuties(ctx context.Context, epoch domain.Epoch, indices []domain.ValidatorIndex) ([]domain.ProposerDuty, error)
DidProposeBlock(ctx context.Context, slot domain.Slot) (bool, error)
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
"github.com/dappnode/validator-tracker/internal/logger"
)

type AttestationChecker struct {
type DutiesChecker struct {
BeaconAdapter ports.BeaconChainAdapter
Web3SignerAdapter ports.Web3SignerAdapter
PollInterval time.Duration
Expand All @@ -19,7 +19,7 @@ type AttestationChecker struct {
}

// If at interval, ticker ticks but check has not ended, we wont start a new check, we will just wait for the next tick.
func (a *AttestationChecker) Run(ctx context.Context) {
func (a *DutiesChecker) Run(ctx context.Context) {
ticker := time.NewTicker(a.PollInterval)
defer ticker.Stop()

Expand All @@ -33,7 +33,7 @@ func (a *AttestationChecker) Run(ctx context.Context) {
}
}

func (a *AttestationChecker) checkLatestFinalizedEpoch(ctx context.Context) {
func (a *DutiesChecker) checkLatestFinalizedEpoch(ctx context.Context) {
finalizedEpoch, err := a.BeaconAdapter.GetFinalizedEpoch(ctx)
if err != nil {
logger.Error("Error fetching finalized epoch: %v", err)
Expand All @@ -43,7 +43,6 @@ func (a *AttestationChecker) checkLatestFinalizedEpoch(ctx context.Context) {
logger.Debug("Finalized epoch %d unchanged, skipping check.", finalizedEpoch)
return
}

a.lastFinalizedEpoch = finalizedEpoch
logger.Info("New finalized epoch %d detected.", finalizedEpoch)

Expand All @@ -62,9 +61,50 @@ func (a *AttestationChecker) checkLatestFinalizedEpoch(ctx context.Context) {

validatorIndices := a.getValidatorsToCheck(indices, finalizedEpoch)
if len(validatorIndices) == 0 {
logger.Debug("No validators left to check for epoch %d", finalizedEpoch)
return
}

// Split proposal vs attestation logic
a.checkProposals(ctx, finalizedEpoch, validatorIndices)
a.checkAttestations(ctx, finalizedEpoch, validatorIndices)
}

func (a *DutiesChecker) checkProposals(
ctx context.Context,
finalizedEpoch domain.Epoch,
indices []domain.ValidatorIndex,
) {
proposerDuties, err := a.BeaconAdapter.GetProposerDuties(ctx, finalizedEpoch, indices)
if err != nil {
logger.Error("Error fetching proposer duties: %v", err)
return
}

if len(proposerDuties) == 0 {
logger.Warn("No proposer duties found for finalized epoch %d.", finalizedEpoch)
return
}

for _, duty := range proposerDuties {
didPropose, err := a.BeaconAdapter.DidProposeBlock(ctx, duty.Slot)
if err != nil {
logger.Warn("⚠️ Could not determine if block was proposed at slot %d: %v", duty.Slot, err)
continue
}
if didPropose {
logger.Info("✅ Validator %d successfully proposed a block at slot %d", duty.ValidatorIndex, duty.Slot)
} else {
logger.Warn("❌ Validator %d was scheduled to propose at slot %d but did not", duty.ValidatorIndex, duty.Slot)
}
}
}

func (a *DutiesChecker) checkAttestations(
ctx context.Context,
finalizedEpoch domain.Epoch,
validatorIndices []domain.ValidatorIndex,
) {
duties, err := a.BeaconAdapter.GetValidatorDutiesBatch(ctx, finalizedEpoch, validatorIndices)
if err != nil {
logger.Error("Error fetching validator duties: %v", err)
Expand All @@ -75,11 +115,10 @@ func (a *AttestationChecker) checkLatestFinalizedEpoch(ctx context.Context) {
return
}

minSlot, maxSlot := getSlotRangeForDuties(duties) // Maximum of 32 + 31 = 63 slots max. Will only happen with users with lots of validators.
slotAttestations := preloadSlotAttestations(ctx, a.BeaconAdapter, minSlot, maxSlot) // get all attestations for the needed slots
minSlot, maxSlot := getSlotRangeForDuties(duties)
slotAttestations := preloadSlotAttestations(ctx, a.BeaconAdapter, minSlot, maxSlot)
committeeSizeCache := make(map[domain.Slot]domain.CommitteeSizeMap)

// Fore each duty (one x validator), check if there is an attestation for the duty slot in the next 32 slots.
for _, duty := range duties {
attestationFound := a.checkDutyAttestation(ctx, duty, slotAttestations, committeeSizeCache)
if !attestationFound {
Expand All @@ -90,7 +129,7 @@ func (a *AttestationChecker) checkLatestFinalizedEpoch(ctx context.Context) {
}
}

func (a *AttestationChecker) getValidatorsToCheck(indices []domain.ValidatorIndex, epoch domain.Epoch) []domain.ValidatorIndex {
func (a *DutiesChecker) getValidatorsToCheck(indices []domain.ValidatorIndex, epoch domain.Epoch) []domain.ValidatorIndex {
var result []domain.ValidatorIndex
for _, index := range indices {
if a.wasCheckedThisEpoch(index, epoch) {
Expand All @@ -101,11 +140,11 @@ func (a *AttestationChecker) getValidatorsToCheck(indices []domain.ValidatorInde
return result
}

func (a *AttestationChecker) wasCheckedThisEpoch(index domain.ValidatorIndex, epoch domain.Epoch) bool {
func (a *DutiesChecker) wasCheckedThisEpoch(index domain.ValidatorIndex, epoch domain.Epoch) bool {
return a.CheckedEpochs[index] == epoch
}

func (a *AttestationChecker) markCheckedThisEpoch(index domain.ValidatorIndex, epoch domain.Epoch) {
func (a *DutiesChecker) markCheckedThisEpoch(index domain.ValidatorIndex, epoch domain.Epoch) {
if a.CheckedEpochs == nil {
a.CheckedEpochs = make(map[domain.ValidatorIndex]domain.Epoch)
}
Expand Down Expand Up @@ -141,7 +180,7 @@ func preloadSlotAttestations(ctx context.Context, beacon ports.BeaconChainAdapte

// checkDutyAttestation checks if there is an attestation for the given duty in the next 32 slots.
// It uses the committee size cache to avoid fetching committee sizes for every duty in repeated slots.
func (a *AttestationChecker) checkDutyAttestation(
func (a *DutiesChecker) checkDutyAttestation(
ctx context.Context,
duty domain.ValidatorDuty,
slotAttestations map[domain.Slot][]domain.Attestation,
Expand Down