diff --git a/go.mod b/go.mod index 3ed40dd..a6dacce 100644 --- a/go.mod +++ b/go.mod @@ -3,7 +3,7 @@ module github.com/dappnode/validator-tracker go 1.24.3 require ( - github.com/attestantio/go-eth2-client v0.25.2 + github.com/attestantio/go-eth2-client v0.26.0 github.com/rs/zerolog v1.34.0 ) diff --git a/go.sum b/go.sum index 8743cfe..84f2f8c 100644 --- a/go.sum +++ b/go.sum @@ -1,5 +1,5 @@ -github.com/attestantio/go-eth2-client v0.25.2 h1:BHOva0HlJZ47HwALQuqqfIAQ6gRIo5P/iqGpphrMsCE= -github.com/attestantio/go-eth2-client v0.25.2/go.mod h1:fvULSL9WtNskkOB4i+Yyr6BKpNHXvmpGZj9969fCrfY= +github.com/attestantio/go-eth2-client v0.26.0 h1:oDWKvIUJfvr1EBi/w9L6mawYZHOCymjHkml7fZplT20= +github.com/attestantio/go-eth2-client v0.26.0/go.mod h1:fvULSL9WtNskkOB4i+Yyr6BKpNHXvmpGZj9969fCrfY= github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44= diff --git a/internal/adapters/attestantclient_adapter.go b/internal/adapters/beaconattestant_adapter.go similarity index 87% rename from internal/adapters/attestantclient_adapter.go rename to internal/adapters/beaconattestant_adapter.go index 4061840..07477b2 100644 --- a/internal/adapters/attestantclient_adapter.go +++ b/internal/adapters/beaconattestant_adapter.go @@ -51,6 +51,15 @@ func (b *beaconAttestantClient) GetFinalizedEpoch(ctx context.Context) (domain.E return domain.Epoch(finality.Data.Finalized.Epoch), nil } +// GetJustifiedEpoch retrieves the latest finalized epoch from the beacon chain. +func (b *beaconAttestantClient) GetJustifiedEpoch(ctx context.Context) (domain.Epoch, error) { + finality, err := b.client.Finality(ctx, &api.FinalityOpts{State: "head"}) + if err != nil { + return 0, err + } + return domain.Epoch(finality.Data.Justified.Epoch), nil +} + // internal/adapters/beaconchain_adapter.go func (b *beaconAttestantClient) GetValidatorDutiesBatch(ctx context.Context, epoch domain.Epoch, validatorIndices []domain.ValidatorIndex) ([]domain.ValidatorDuty, error) { // Convert to phase0.ValidatorIndex @@ -226,3 +235,25 @@ func (b *beaconAttestantClient) DidProposeBlock(ctx context.Context, slot domain } return block != nil && block.Data != nil, nil } + +func (b *beaconAttestantClient) GetValidatorsLiveness(ctx context.Context, epoch domain.Epoch, indices []domain.ValidatorIndex) (map[domain.ValidatorIndex]bool, error) { + // Convert to phase0.ValidatorIndex + var beaconIndices []phase0.ValidatorIndex + for _, idx := range indices { + beaconIndices = append(beaconIndices, phase0.ValidatorIndex(idx)) + } + + liveness, err := b.client.ValidatorLiveness(ctx, &api.ValidatorLivenessOpts{ + Epoch: phase0.Epoch(epoch), + Indices: beaconIndices, + }) + if err != nil { + return nil, err + } + + livenessMap := make(map[domain.ValidatorIndex]bool) + for _, v := range liveness.Data { + livenessMap[domain.ValidatorIndex(v.Index)] = v.IsLive + } + return livenessMap, nil +} diff --git a/internal/application/ports/beaconchain.go b/internal/application/ports/beaconchain.go index b897f6f..d320388 100644 --- a/internal/application/ports/beaconchain.go +++ b/internal/application/ports/beaconchain.go @@ -9,6 +9,7 @@ import ( // ports/beaconchain_adapter.go type BeaconChainAdapter interface { GetFinalizedEpoch(ctx context.Context) (domain.Epoch, error) + GetJustifiedEpoch(ctx context.Context) (domain.Epoch, error) GetValidatorDutiesBatch(ctx context.Context, epoch domain.Epoch, validatorIndices []domain.ValidatorIndex) ([]domain.ValidatorDuty, error) GetCommitteeSizeMap(ctx context.Context, slot domain.Slot) (domain.CommitteeSizeMap, error) GetBlockAttestations(ctx context.Context, slot domain.Slot) ([]domain.Attestation, error) @@ -16,4 +17,6 @@ type BeaconChainAdapter interface { GetProposerDuties(ctx context.Context, epoch domain.Epoch, indices []domain.ValidatorIndex) ([]domain.ProposerDuty, error) DidProposeBlock(ctx context.Context, slot domain.Slot) (bool, error) + + GetValidatorsLiveness(ctx context.Context, epoch domain.Epoch, indices []domain.ValidatorIndex) (map[domain.ValidatorIndex]bool, error) } diff --git a/internal/application/services/dutieschecker_service.go b/internal/application/services/dutieschecker_service.go index c276567..a533e56 100644 --- a/internal/application/services/dutieschecker_service.go +++ b/internal/application/services/dutieschecker_service.go @@ -14,7 +14,7 @@ type DutiesChecker struct { Web3SignerAdapter ports.Web3SignerAdapter PollInterval time.Duration - lastFinalizedEpoch domain.Epoch + lastJustifiedEpoch domain.Epoch CheckedEpochs map[domain.ValidatorIndex]domain.Epoch // latest epoch checked for each validator index } @@ -26,25 +26,25 @@ func (a *DutiesChecker) Run(ctx context.Context) { for { select { case <-ticker.C: - a.checkLatestFinalizedEpoch(ctx) + a.chechLatestJustifiedEpoch(ctx) case <-ctx.Done(): return } } } -func (a *DutiesChecker) checkLatestFinalizedEpoch(ctx context.Context) { - finalizedEpoch, err := a.BeaconAdapter.GetFinalizedEpoch(ctx) +func (a *DutiesChecker) chechLatestJustifiedEpoch(ctx context.Context) { + justifiedEpoch, err := a.BeaconAdapter.GetJustifiedEpoch(ctx) if err != nil { - logger.Error("Error fetching finalized epoch: %v", err) + logger.Error("Error fetching justified epoch: %v", err) return } - if finalizedEpoch == a.lastFinalizedEpoch { - logger.Debug("Finalized epoch %d unchanged, skipping check.", finalizedEpoch) + if justifiedEpoch == a.lastJustifiedEpoch { + logger.Debug("Justified epoch %d unchanged, skipping check.", justifiedEpoch) return } - a.lastFinalizedEpoch = finalizedEpoch - logger.Info("New finalized epoch %d detected.", finalizedEpoch) + a.lastJustifiedEpoch = justifiedEpoch + logger.Info("New justified epoch %d detected.", justifiedEpoch) pubkeys, err := a.Web3SignerAdapter.GetValidatorPubkeys() if err != nil { @@ -59,73 +59,88 @@ func (a *DutiesChecker) checkLatestFinalizedEpoch(ctx context.Context) { } logger.Info("Found %d validator indices active", len(indices)) - validatorIndices := a.getValidatorsToCheck(indices, finalizedEpoch) + validatorIndices := a.getValidatorsToCheck(indices, justifiedEpoch) if len(validatorIndices) == 0 { - logger.Debug("No validators left to check for epoch %d", finalizedEpoch) + logger.Debug("No validators left to check for epoch %d", justifiedEpoch) return } - // Split proposal vs attestation logic - a.checkProposals(ctx, finalizedEpoch, validatorIndices) - a.checkAttestations(ctx, finalizedEpoch, validatorIndices) + // Initialize a map to track success of both checks + proposalChecked := make(map[domain.ValidatorIndex]bool) + livenessChecked := make(map[domain.ValidatorIndex]bool) + + a.checkProposals(ctx, justifiedEpoch, validatorIndices, proposalChecked) + a.checkLiveness(ctx, justifiedEpoch, validatorIndices, livenessChecked) + + // Mark validators as checked only if both checks succeeded + for _, index := range validatorIndices { + if proposalChecked[index] && livenessChecked[index] { + a.CheckedEpochs[index] = justifiedEpoch + } + } } -func (a *DutiesChecker) checkProposals( +func (a *DutiesChecker) checkLiveness( ctx context.Context, - finalizedEpoch domain.Epoch, + epochToTrack domain.Epoch, indices []domain.ValidatorIndex, + livenessChecked map[domain.ValidatorIndex]bool, ) { - proposerDuties, err := a.BeaconAdapter.GetProposerDuties(ctx, finalizedEpoch, indices) - if err != nil { - logger.Error("Error fetching proposer duties: %v", err) + if len(indices) == 0 { + logger.Warn("No validators to check liveness for in epoch %d", epochToTrack) return } - if len(proposerDuties) == 0 { - logger.Warn("No proposer duties found for finalized epoch %d.", finalizedEpoch) + livenessMap, err := a.BeaconAdapter.GetValidatorsLiveness(ctx, epochToTrack, indices) + if err != nil { + logger.Error("Error checking liveness for validators: %v", err) return } - for _, duty := range proposerDuties { - didPropose, err := a.BeaconAdapter.DidProposeBlock(ctx, duty.Slot) - if err != nil { - logger.Warn("⚠️ Could not determine if block was proposed at slot %d: %v", duty.Slot, err) + for _, index := range indices { + isLive, ok := livenessMap[index] + if !ok { + logger.Warn("⚠️ Liveness info not found for validator %d in epoch %d", index, epochToTrack) continue } - if didPropose { - logger.Info("✅ Validator %d successfully proposed a block at slot %d", duty.ValidatorIndex, duty.Slot) + livenessChecked[index] = true + if !isLive { + logger.Warn("❌ Validator %d is not live in epoch %d", index, epochToTrack) } else { - logger.Warn("❌ Validator %d was scheduled to propose at slot %d but did not", duty.ValidatorIndex, duty.Slot) + logger.Info("✅ Validator %d is live in epoch %d", index, epochToTrack) } } } -func (a *DutiesChecker) checkAttestations( +func (a *DutiesChecker) checkProposals( ctx context.Context, - finalizedEpoch domain.Epoch, - validatorIndices []domain.ValidatorIndex, + epochToTrack domain.Epoch, + indices []domain.ValidatorIndex, + proposalChecked map[domain.ValidatorIndex]bool, ) { - duties, err := a.BeaconAdapter.GetValidatorDutiesBatch(ctx, finalizedEpoch, validatorIndices) + proposerDuties, err := a.BeaconAdapter.GetProposerDuties(ctx, epochToTrack, indices) if err != nil { - logger.Error("Error fetching validator duties: %v", err) + logger.Error("Error fetching proposer duties: %v", err) return } - if len(duties) == 0 { - logger.Warn("No duties found for finalized epoch %d. This should not happen!", finalizedEpoch) + + if len(proposerDuties) == 0 { + logger.Warn("No proposer duties for any validators in epoch %d", epochToTrack) return } - minSlot, maxSlot := getSlotRangeForDuties(duties) - slotAttestations := preloadSlotAttestations(ctx, a.BeaconAdapter, minSlot, maxSlot) - committeeSizeCache := make(map[domain.Slot]domain.CommitteeSizeMap) - - for _, duty := range duties { - attestationFound := a.checkDutyAttestation(ctx, duty, slotAttestations, committeeSizeCache) - if !attestationFound { - logger.Warn(" ❌ No attestation found for validator %d in finalized epoch %d", - duty.ValidatorIndex, finalizedEpoch) + for _, duty := range proposerDuties { + didPropose, err := a.BeaconAdapter.DidProposeBlock(ctx, duty.Slot) + if err != nil { + logger.Warn("⚠️ Could not determine if block was proposed at slot %d: %v", duty.Slot, err) + continue + } + proposalChecked[duty.ValidatorIndex] = true + if didPropose { + logger.Info("✅ Validator %d successfully proposed a block at slot %d", duty.ValidatorIndex, duty.Slot) + } else { + logger.Warn("❌ Validator %d was scheduled to propose at slot %d but did not", duty.ValidatorIndex, duty.Slot) } - a.markCheckedThisEpoch(duty.ValidatorIndex, finalizedEpoch) } } @@ -143,114 +158,3 @@ func (a *DutiesChecker) getValidatorsToCheck(indices []domain.ValidatorIndex, ep func (a *DutiesChecker) wasCheckedThisEpoch(index domain.ValidatorIndex, epoch domain.Epoch) bool { return a.CheckedEpochs[index] == epoch } - -func (a *DutiesChecker) markCheckedThisEpoch(index domain.ValidatorIndex, epoch domain.Epoch) { - if a.CheckedEpochs == nil { - a.CheckedEpochs = make(map[domain.ValidatorIndex]domain.Epoch) - } - a.CheckedEpochs[index] = epoch -} - -// Important: This function assumes duties is not empty (at least one duty exists). -func getSlotRangeForDuties(duties []domain.ValidatorDuty) (domain.Slot, domain.Slot) { - minSlot, maxSlot := duties[0].Slot, duties[0].Slot - for _, d := range duties { - if d.Slot < minSlot { - minSlot = d.Slot - } - if d.Slot > maxSlot { - maxSlot = d.Slot - } - } - return minSlot, maxSlot -} - -func preloadSlotAttestations(ctx context.Context, beacon ports.BeaconChainAdapter, minSlot, maxSlot domain.Slot) map[domain.Slot][]domain.Attestation { - result := make(map[domain.Slot][]domain.Attestation) - for slot := minSlot + 1; slot <= maxSlot+32; slot++ { - att, err := beacon.GetBlockAttestations(ctx, slot) - if err != nil { - logger.Warn("Error fetching attestations for slot %d: %v. Was this slot missed?", slot, err) - continue - } - result[slot] = att - } - return result -} - -// checkDutyAttestation checks if there is an attestation for the given duty in the next 32 slots. -// It uses the committee size cache to avoid fetching committee sizes for every duty in repeated slots. -func (a *DutiesChecker) checkDutyAttestation( - ctx context.Context, - duty domain.ValidatorDuty, - slotAttestations map[domain.Slot][]domain.Attestation, - committeeSizeCache map[domain.Slot]domain.CommitteeSizeMap, -) bool { - committeeSizeMap, ok := committeeSizeCache[duty.Slot] - if !ok { - var err error - committeeSizeMap, err = a.BeaconAdapter.GetCommitteeSizeMap(ctx, duty.Slot) - if err != nil { - logger.Warn("Error fetching committee sizes for slot %d: %v", duty.Slot, err) - return false - } - committeeSizeCache[duty.Slot] = committeeSizeMap - } - - for slot := duty.Slot + 1; slot <= duty.Slot+32; slot++ { - attestations := slotAttestations[slot] - for _, att := range attestations { - if att.DataSlot != duty.Slot { - continue - } - if !isBitSet(att.CommitteeBits, int(duty.CommitteeIndex)) { - continue - } - bitPosition := computeBitPosition( - duty.CommitteeIndex, - duty.ValidatorCommitteeIdx, - att.CommitteeBits, - committeeSizeMap, - ) - if !isBitSet(att.AggregationBits, bitPosition) { - continue - } - logger.Info("✅ Validator %d attested in committee %d for duty slot %d (included in block slot %d)", - duty.ValidatorIndex, duty.CommitteeIndex, duty.Slot, slot) - return true - } - } - return false -} - -// computeBitPosition calculates the bit position for the validator in the committee bits. -// It sums the sizes of all committees before the one the validator is in, and adds the validator's index in that committee. -// This is used to determine if the validator's aggregation bit is set in the attestation. -func computeBitPosition( - validatorCommitteeIndex domain.CommitteeIndex, - validatorIndexInCommittee uint64, - committeeBits []byte, - committeeSizeMap domain.CommitteeSizeMap, -) int { - bitPosition := 0 - for i := 0; i < 64; i++ { - if !isBitSet(committeeBits, i) { // if the committee bit is not set, dont add its size to final bit position - continue - } - if i == int(validatorCommitteeIndex) { // We got to the committee of the validator, we can stop here. - break - } - bitPosition += committeeSizeMap[domain.CommitteeIndex(i)] // Add the size of the committee to the bit position. Bit was set and it's not the committee of the validator. - } - bitPosition += int(validatorIndexInCommittee) - return bitPosition -} - -func isBitSet(bits []byte, index int) bool { - byteIndex := index / 8 - bitIndex := index % 8 - if byteIndex >= len(bits) { - return false - } - return (bits[byteIndex] & (1 << uint(bitIndex))) != 0 -}