Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ module github.com/dappnode/validator-tracker
go 1.24.3

require (
github.com/attestantio/go-eth2-client v0.25.2
github.com/attestantio/go-eth2-client v0.26.0
github.com/rs/zerolog v1.34.0
)

Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
github.com/attestantio/go-eth2-client v0.25.2 h1:BHOva0HlJZ47HwALQuqqfIAQ6gRIo5P/iqGpphrMsCE=
github.com/attestantio/go-eth2-client v0.25.2/go.mod h1:fvULSL9WtNskkOB4i+Yyr6BKpNHXvmpGZj9969fCrfY=
github.com/attestantio/go-eth2-client v0.26.0 h1:oDWKvIUJfvr1EBi/w9L6mawYZHOCymjHkml7fZplT20=
github.com/attestantio/go-eth2-client v0.26.0/go.mod h1:fvULSL9WtNskkOB4i+Yyr6BKpNHXvmpGZj9969fCrfY=
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44=
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,15 @@ func (b *beaconAttestantClient) GetFinalizedEpoch(ctx context.Context) (domain.E
return domain.Epoch(finality.Data.Finalized.Epoch), nil
}

// GetJustifiedEpoch retrieves the latest finalized epoch from the beacon chain.
func (b *beaconAttestantClient) GetJustifiedEpoch(ctx context.Context) (domain.Epoch, error) {
finality, err := b.client.Finality(ctx, &api.FinalityOpts{State: "head"})
if err != nil {
return 0, err
}
return domain.Epoch(finality.Data.Justified.Epoch), nil
}

// internal/adapters/beaconchain_adapter.go
func (b *beaconAttestantClient) GetValidatorDutiesBatch(ctx context.Context, epoch domain.Epoch, validatorIndices []domain.ValidatorIndex) ([]domain.ValidatorDuty, error) {
// Convert to phase0.ValidatorIndex
Expand Down Expand Up @@ -226,3 +235,25 @@ func (b *beaconAttestantClient) DidProposeBlock(ctx context.Context, slot domain
}
return block != nil && block.Data != nil, nil
}

func (b *beaconAttestantClient) GetValidatorsLiveness(ctx context.Context, epoch domain.Epoch, indices []domain.ValidatorIndex) (map[domain.ValidatorIndex]bool, error) {
// Convert to phase0.ValidatorIndex
var beaconIndices []phase0.ValidatorIndex
for _, idx := range indices {
beaconIndices = append(beaconIndices, phase0.ValidatorIndex(idx))
}

liveness, err := b.client.ValidatorLiveness(ctx, &api.ValidatorLivenessOpts{
Epoch: phase0.Epoch(epoch),
Indices: beaconIndices,
})
if err != nil {
return nil, err
}

livenessMap := make(map[domain.ValidatorIndex]bool)
for _, v := range liveness.Data {
livenessMap[domain.ValidatorIndex(v.Index)] = v.IsLive
}
return livenessMap, nil
}
3 changes: 3 additions & 0 deletions internal/application/ports/beaconchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,14 @@ import (
// ports/beaconchain_adapter.go
type BeaconChainAdapter interface {
GetFinalizedEpoch(ctx context.Context) (domain.Epoch, error)
GetJustifiedEpoch(ctx context.Context) (domain.Epoch, error)
GetValidatorDutiesBatch(ctx context.Context, epoch domain.Epoch, validatorIndices []domain.ValidatorIndex) ([]domain.ValidatorDuty, error)
GetCommitteeSizeMap(ctx context.Context, slot domain.Slot) (domain.CommitteeSizeMap, error)
GetBlockAttestations(ctx context.Context, slot domain.Slot) ([]domain.Attestation, error)
GetValidatorIndicesByPubkeys(ctx context.Context, pubkeys []string) ([]domain.ValidatorIndex, error)

GetProposerDuties(ctx context.Context, epoch domain.Epoch, indices []domain.ValidatorIndex) ([]domain.ProposerDuty, error)
DidProposeBlock(ctx context.Context, slot domain.Slot) (bool, error)

GetValidatorsLiveness(ctx context.Context, epoch domain.Epoch, indices []domain.ValidatorIndex) (map[domain.ValidatorIndex]bool, error)
}
216 changes: 60 additions & 156 deletions internal/application/services/dutieschecker_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ type DutiesChecker struct {
Web3SignerAdapter ports.Web3SignerAdapter
PollInterval time.Duration

lastFinalizedEpoch domain.Epoch
lastJustifiedEpoch domain.Epoch
CheckedEpochs map[domain.ValidatorIndex]domain.Epoch // latest epoch checked for each validator index
}

Expand All @@ -26,25 +26,25 @@ func (a *DutiesChecker) Run(ctx context.Context) {
for {
select {
case <-ticker.C:
a.checkLatestFinalizedEpoch(ctx)
a.chechLatestJustifiedEpoch(ctx)
case <-ctx.Done():
return
}
}
}

func (a *DutiesChecker) checkLatestFinalizedEpoch(ctx context.Context) {
finalizedEpoch, err := a.BeaconAdapter.GetFinalizedEpoch(ctx)
func (a *DutiesChecker) chechLatestJustifiedEpoch(ctx context.Context) {
justifiedEpoch, err := a.BeaconAdapter.GetJustifiedEpoch(ctx)
if err != nil {
logger.Error("Error fetching finalized epoch: %v", err)
logger.Error("Error fetching justified epoch: %v", err)
return
}
if finalizedEpoch == a.lastFinalizedEpoch {
logger.Debug("Finalized epoch %d unchanged, skipping check.", finalizedEpoch)
if justifiedEpoch == a.lastJustifiedEpoch {
logger.Debug("Justified epoch %d unchanged, skipping check.", justifiedEpoch)
return
}
a.lastFinalizedEpoch = finalizedEpoch
logger.Info("New finalized epoch %d detected.", finalizedEpoch)
a.lastJustifiedEpoch = justifiedEpoch
logger.Info("New justified epoch %d detected.", justifiedEpoch)

pubkeys, err := a.Web3SignerAdapter.GetValidatorPubkeys()
if err != nil {
Expand All @@ -59,73 +59,88 @@ func (a *DutiesChecker) checkLatestFinalizedEpoch(ctx context.Context) {
}
logger.Info("Found %d validator indices active", len(indices))

validatorIndices := a.getValidatorsToCheck(indices, finalizedEpoch)
validatorIndices := a.getValidatorsToCheck(indices, justifiedEpoch)
if len(validatorIndices) == 0 {
logger.Debug("No validators left to check for epoch %d", finalizedEpoch)
logger.Debug("No validators left to check for epoch %d", justifiedEpoch)
return
}

// Split proposal vs attestation logic
a.checkProposals(ctx, finalizedEpoch, validatorIndices)
a.checkAttestations(ctx, finalizedEpoch, validatorIndices)
// Initialize a map to track success of both checks
proposalChecked := make(map[domain.ValidatorIndex]bool)
livenessChecked := make(map[domain.ValidatorIndex]bool)

a.checkProposals(ctx, justifiedEpoch, validatorIndices, proposalChecked)
a.checkLiveness(ctx, justifiedEpoch, validatorIndices, livenessChecked)

// Mark validators as checked only if both checks succeeded
for _, index := range validatorIndices {
if proposalChecked[index] && livenessChecked[index] {
a.CheckedEpochs[index] = justifiedEpoch
}
}
}

func (a *DutiesChecker) checkProposals(
func (a *DutiesChecker) checkLiveness(
ctx context.Context,
finalizedEpoch domain.Epoch,
epochToTrack domain.Epoch,
indices []domain.ValidatorIndex,
livenessChecked map[domain.ValidatorIndex]bool,
) {
proposerDuties, err := a.BeaconAdapter.GetProposerDuties(ctx, finalizedEpoch, indices)
if err != nil {
logger.Error("Error fetching proposer duties: %v", err)
if len(indices) == 0 {
logger.Warn("No validators to check liveness for in epoch %d", epochToTrack)
return
}

if len(proposerDuties) == 0 {
logger.Warn("No proposer duties found for finalized epoch %d.", finalizedEpoch)
livenessMap, err := a.BeaconAdapter.GetValidatorsLiveness(ctx, epochToTrack, indices)
if err != nil {
logger.Error("Error checking liveness for validators: %v", err)
return
}

for _, duty := range proposerDuties {
didPropose, err := a.BeaconAdapter.DidProposeBlock(ctx, duty.Slot)
if err != nil {
logger.Warn("⚠️ Could not determine if block was proposed at slot %d: %v", duty.Slot, err)
for _, index := range indices {
isLive, ok := livenessMap[index]
if !ok {
logger.Warn("⚠️ Liveness info not found for validator %d in epoch %d", index, epochToTrack)
continue
}
if didPropose {
logger.Info("✅ Validator %d successfully proposed a block at slot %d", duty.ValidatorIndex, duty.Slot)
livenessChecked[index] = true
if !isLive {
logger.Warn("❌ Validator %d is not live in epoch %d", index, epochToTrack)
} else {
logger.Warn("❌ Validator %d was scheduled to propose at slot %d but did not", duty.ValidatorIndex, duty.Slot)
logger.Info("✅ Validator %d is live in epoch %d", index, epochToTrack)
}
}
}

func (a *DutiesChecker) checkAttestations(
func (a *DutiesChecker) checkProposals(
ctx context.Context,
finalizedEpoch domain.Epoch,
validatorIndices []domain.ValidatorIndex,
epochToTrack domain.Epoch,
indices []domain.ValidatorIndex,
proposalChecked map[domain.ValidatorIndex]bool,
) {
duties, err := a.BeaconAdapter.GetValidatorDutiesBatch(ctx, finalizedEpoch, validatorIndices)
proposerDuties, err := a.BeaconAdapter.GetProposerDuties(ctx, epochToTrack, indices)
if err != nil {
logger.Error("Error fetching validator duties: %v", err)
logger.Error("Error fetching proposer duties: %v", err)
return
}
if len(duties) == 0 {
logger.Warn("No duties found for finalized epoch %d. This should not happen!", finalizedEpoch)

if len(proposerDuties) == 0 {
logger.Warn("No proposer duties for any validators in epoch %d", epochToTrack)
return
}

minSlot, maxSlot := getSlotRangeForDuties(duties)
slotAttestations := preloadSlotAttestations(ctx, a.BeaconAdapter, minSlot, maxSlot)
committeeSizeCache := make(map[domain.Slot]domain.CommitteeSizeMap)

for _, duty := range duties {
attestationFound := a.checkDutyAttestation(ctx, duty, slotAttestations, committeeSizeCache)
if !attestationFound {
logger.Warn(" ❌ No attestation found for validator %d in finalized epoch %d",
duty.ValidatorIndex, finalizedEpoch)
for _, duty := range proposerDuties {
didPropose, err := a.BeaconAdapter.DidProposeBlock(ctx, duty.Slot)
if err != nil {
logger.Warn("⚠️ Could not determine if block was proposed at slot %d: %v", duty.Slot, err)
continue
}
proposalChecked[duty.ValidatorIndex] = true
if didPropose {
logger.Info("✅ Validator %d successfully proposed a block at slot %d", duty.ValidatorIndex, duty.Slot)
} else {
logger.Warn("❌ Validator %d was scheduled to propose at slot %d but did not", duty.ValidatorIndex, duty.Slot)
}
a.markCheckedThisEpoch(duty.ValidatorIndex, finalizedEpoch)
}
}

Expand All @@ -143,114 +158,3 @@ func (a *DutiesChecker) getValidatorsToCheck(indices []domain.ValidatorIndex, ep
func (a *DutiesChecker) wasCheckedThisEpoch(index domain.ValidatorIndex, epoch domain.Epoch) bool {
return a.CheckedEpochs[index] == epoch
}

func (a *DutiesChecker) markCheckedThisEpoch(index domain.ValidatorIndex, epoch domain.Epoch) {
if a.CheckedEpochs == nil {
a.CheckedEpochs = make(map[domain.ValidatorIndex]domain.Epoch)
}
a.CheckedEpochs[index] = epoch
}

// Important: This function assumes duties is not empty (at least one duty exists).
func getSlotRangeForDuties(duties []domain.ValidatorDuty) (domain.Slot, domain.Slot) {
minSlot, maxSlot := duties[0].Slot, duties[0].Slot
for _, d := range duties {
if d.Slot < minSlot {
minSlot = d.Slot
}
if d.Slot > maxSlot {
maxSlot = d.Slot
}
}
return minSlot, maxSlot
}

func preloadSlotAttestations(ctx context.Context, beacon ports.BeaconChainAdapter, minSlot, maxSlot domain.Slot) map[domain.Slot][]domain.Attestation {
result := make(map[domain.Slot][]domain.Attestation)
for slot := minSlot + 1; slot <= maxSlot+32; slot++ {
att, err := beacon.GetBlockAttestations(ctx, slot)
if err != nil {
logger.Warn("Error fetching attestations for slot %d: %v. Was this slot missed?", slot, err)
continue
}
result[slot] = att
}
return result
}

// checkDutyAttestation checks if there is an attestation for the given duty in the next 32 slots.
// It uses the committee size cache to avoid fetching committee sizes for every duty in repeated slots.
func (a *DutiesChecker) checkDutyAttestation(
ctx context.Context,
duty domain.ValidatorDuty,
slotAttestations map[domain.Slot][]domain.Attestation,
committeeSizeCache map[domain.Slot]domain.CommitteeSizeMap,
) bool {
committeeSizeMap, ok := committeeSizeCache[duty.Slot]
if !ok {
var err error
committeeSizeMap, err = a.BeaconAdapter.GetCommitteeSizeMap(ctx, duty.Slot)
if err != nil {
logger.Warn("Error fetching committee sizes for slot %d: %v", duty.Slot, err)
return false
}
committeeSizeCache[duty.Slot] = committeeSizeMap
}

for slot := duty.Slot + 1; slot <= duty.Slot+32; slot++ {
attestations := slotAttestations[slot]
for _, att := range attestations {
if att.DataSlot != duty.Slot {
continue
}
if !isBitSet(att.CommitteeBits, int(duty.CommitteeIndex)) {
continue
}
bitPosition := computeBitPosition(
duty.CommitteeIndex,
duty.ValidatorCommitteeIdx,
att.CommitteeBits,
committeeSizeMap,
)
if !isBitSet(att.AggregationBits, bitPosition) {
continue
}
logger.Info("✅ Validator %d attested in committee %d for duty slot %d (included in block slot %d)",
duty.ValidatorIndex, duty.CommitteeIndex, duty.Slot, slot)
return true
}
}
return false
}

// computeBitPosition calculates the bit position for the validator in the committee bits.
// It sums the sizes of all committees before the one the validator is in, and adds the validator's index in that committee.
// This is used to determine if the validator's aggregation bit is set in the attestation.
func computeBitPosition(
validatorCommitteeIndex domain.CommitteeIndex,
validatorIndexInCommittee uint64,
committeeBits []byte,
committeeSizeMap domain.CommitteeSizeMap,
) int {
bitPosition := 0
for i := 0; i < 64; i++ {
if !isBitSet(committeeBits, i) { // if the committee bit is not set, dont add its size to final bit position
continue
}
if i == int(validatorCommitteeIndex) { // We got to the committee of the validator, we can stop here.
break
}
bitPosition += committeeSizeMap[domain.CommitteeIndex(i)] // Add the size of the committee to the bit position. Bit was set and it's not the committee of the validator.
}
bitPosition += int(validatorIndexInCommittee)
return bitPosition
}

func isBitSet(bits []byte, index int) bool {
byteIndex := index / 8
bitIndex := index % 8
if byteIndex >= len(bits) {
return false
}
return (bits[byteIndex] & (1 << uint(bitIndex))) != 0
}