diff --git a/cmd/main.go b/cmd/main.go index 55dda7d..4a9f5ca 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -50,10 +50,10 @@ func main() { // Start the attestation checker service in a goroutine logger.Info("Starting attestation checker for %d validators", len(indices)) checker := &services.AttestationChecker{ - BeaconAdapter: adapter, - ValidatorIndices: indices, - PollInterval: 1 * time.Minute, - CheckedValidators: make(map[domain.ValidatorIndex]domain.Epoch), + BeaconAdapter: adapter, + ValidatorIndices: indices, + PollInterval: 1 * time.Minute, + CheckedEpochs: make(map[domain.ValidatorIndex]domain.Epoch), } wg.Add(1) go func() { diff --git a/internal/application/domain/validator.go b/internal/application/domain/validator.go index 69501d4..29fea9a 100644 --- a/internal/application/domain/validator.go +++ b/internal/application/domain/validator.go @@ -5,7 +5,6 @@ type Slot uint64 type ValidatorIndex uint64 type CommitteeIndex uint64 -// domain/domain.go type ValidatorDuty struct { Slot Slot CommitteeIndex CommitteeIndex diff --git a/internal/application/services/attestation_oracle.go b/internal/application/services/attestation_oracle.go index 5ad7706..5b1dc3a 100644 --- a/internal/application/services/attestation_oracle.go +++ b/internal/application/services/attestation_oracle.go @@ -15,9 +15,10 @@ type AttestationChecker struct { PollInterval time.Duration lastFinalizedEpoch domain.Epoch - CheckedValidators map[domain.ValidatorIndex]domain.Epoch + CheckedEpochs map[domain.ValidatorIndex]domain.Epoch // latest epoch checked for each validator index } +// If at interval, ticker ticks but check has not ended, we wont start a new check, we will just wait for the next tick. func (a *AttestationChecker) Run(ctx context.Context) { ticker := time.NewTicker(a.PollInterval) defer ticker.Stop() @@ -32,7 +33,6 @@ func (a *AttestationChecker) Run(ctx context.Context) { } } -// checkLatestFinalizedEpoch checks the latest finalized epoch and verifies if validators have attested. func (a *AttestationChecker) checkLatestFinalizedEpoch(ctx context.Context) { finalizedEpoch, err := a.BeaconAdapter.GetFinalizedEpoch(ctx) if err != nil { @@ -40,104 +40,143 @@ func (a *AttestationChecker) checkLatestFinalizedEpoch(ctx context.Context) { return } - // If finalized epoch is same as last seen, skip execution if finalizedEpoch == a.lastFinalizedEpoch { logger.Debug("Finalized epoch %d unchanged, skipping check.", finalizedEpoch) return } - logger.Info("New finalized epoch %d detected. Checking attestations for validators...", finalizedEpoch) a.lastFinalizedEpoch = finalizedEpoch + logger.Info("New finalized epoch %d detected. Checking attestations for validators...", finalizedEpoch) - // Determine which validators still need checking for this epoch - var validatorsToCheck []domain.ValidatorIndex - for _, index := range a.ValidatorIndices { - // Skip if already confirmed - if epoch, ok := a.CheckedValidators[index]; ok && epoch == finalizedEpoch { - logger.Debug("Duties of validator %d already successfully checked for epoch %d. Skipping.", index, finalizedEpoch) - continue - } - validatorsToCheck = append(validatorsToCheck, index) - } - - // If no validators to check, skip - if len(validatorsToCheck) == 0 { - logger.Info("All validators already checked for epoch %d. Skipping.", finalizedEpoch) + validatorIndices := a.getValidatorsToCheck(finalizedEpoch) + if len(validatorIndices) == 0 { return } - // Fetch all duties in batch - duties, err := a.BeaconAdapter.GetValidatorDutiesBatch(ctx, finalizedEpoch, validatorsToCheck) + duties, err := a.BeaconAdapter.GetValidatorDutiesBatch(ctx, finalizedEpoch, validatorIndices) if err != nil { logger.Error("Error fetching validator duties: %v", err) return } - // For each duty, check attestation inclusion + // This should never happen, we got here if we had no errors getting validator duties and there are validators to check. + if len(duties) == 0 { + logger.Warn("No duties found for finalized epoch %d. This should not happen!", finalizedEpoch) + return + } + + minSlot, maxSlot := getSlotRangeForDuties(duties) // Maximum of 32 + 31 = 63 slots max. Will only happen with users with lots of validators. + slotAttestations := preloadSlotAttestations(ctx, a.BeaconAdapter, minSlot, maxSlot) // get all attestations for the needed slots + committeeSizeCache := make(map[domain.Slot]domain.CommitteeSizeMap) + + // Fore each duty (one x validator), check if there is an attestation for the duty slot in the next 32 slots. for _, duty := range duties { - attestationFound := false + attestationFound := a.checkDutyAttestation(ctx, duty, slotAttestations, committeeSizeCache) + if !attestationFound { + logger.Warn(" ❌ No attestation found for validator %d in finalized epoch %d", + duty.ValidatorIndex, finalizedEpoch) + } + a.markCheckedThisEpoch(duty.ValidatorIndex, finalizedEpoch) + } +} - logger.Debug("Checking duty for validator %d in committee %d of slot %d", - duty.ValidatorIndex, duty.CommitteeIndex, duty.Slot) +func (a *AttestationChecker) getValidatorsToCheck(epoch domain.Epoch) []domain.ValidatorIndex { + var result []domain.ValidatorIndex + for _, index := range a.ValidatorIndices { + if a.wasCheckedThisEpoch(index, epoch) { + continue + } + result = append(result, index) + } + return result +} + +func (a *AttestationChecker) wasCheckedThisEpoch(index domain.ValidatorIndex, epoch domain.Epoch) bool { + return a.CheckedEpochs[index] == epoch +} - // Check up to 32 slots after the duty slot - for slot := duty.Slot + 1; slot <= duty.Slot+32; slot++ { - attestations, err := a.BeaconAdapter.GetBlockAttestations(ctx, slot) - if err != nil { - logger.Warn("Error fetching attestations for slot %d: %v", slot, err) +func (a *AttestationChecker) markCheckedThisEpoch(index domain.ValidatorIndex, epoch domain.Epoch) { + if a.CheckedEpochs == nil { + a.CheckedEpochs = make(map[domain.ValidatorIndex]domain.Epoch) + } + a.CheckedEpochs[index] = epoch +} + +// Important: This function assumes duties is not empty (at least one duty exists). +func getSlotRangeForDuties(duties []domain.ValidatorDuty) (domain.Slot, domain.Slot) { + minSlot, maxSlot := duties[0].Slot, duties[0].Slot + for _, d := range duties { + if d.Slot < minSlot { + minSlot = d.Slot + } + if d.Slot > maxSlot { + maxSlot = d.Slot + } + } + return minSlot, maxSlot +} + +func preloadSlotAttestations(ctx context.Context, beacon ports.BeaconChainAdapter, minSlot, maxSlot domain.Slot) map[domain.Slot][]domain.Attestation { + result := make(map[domain.Slot][]domain.Attestation) + for slot := minSlot + 1; slot <= maxSlot+32; slot++ { + att, err := beacon.GetBlockAttestations(ctx, slot) + if err != nil { + logger.Warn("Error fetching attestations for slot %d: %v. Was this slot missed?", slot, err) + continue + } + result[slot] = att + } + return result +} + +// checkDutyAttestation checks if there is an attestation for the given duty in the next 32 slots. +// It uses the committee size cache to avoid fetching committee sizes for every duty in repeated slots. +func (a *AttestationChecker) checkDutyAttestation( + ctx context.Context, + duty domain.ValidatorDuty, + slotAttestations map[domain.Slot][]domain.Attestation, + committeeSizeCache map[domain.Slot]domain.CommitteeSizeMap, +) bool { + committeeSizeMap, ok := committeeSizeCache[duty.Slot] + if !ok { + var err error + committeeSizeMap, err = a.BeaconAdapter.GetCommitteeSizeMap(ctx, duty.Slot) + if err != nil { + logger.Warn("Error fetching committee sizes for slot %d: %v", duty.Slot, err) + return false + } + committeeSizeCache[duty.Slot] = committeeSizeMap + } + + for slot := duty.Slot + 1; slot <= duty.Slot+32; slot++ { + attestations := slotAttestations[slot] + for _, att := range attestations { + if att.DataSlot != duty.Slot { continue } - // Fetch committee sizes for this block slot - committeeSizeMap, err := a.BeaconAdapter.GetCommitteeSizeMap(ctx, duty.Slot) - if err != nil { - logger.Warn("Error fetching committee sizes for slot %d: %v", slot, err) + if !isBitSet(att.CommitteeBits, int(duty.CommitteeIndex)) { continue } - - for _, att := range attestations { - if att.DataSlot != duty.Slot { - continue - } - if !isBitSet(att.CommitteeBits, int(duty.CommitteeIndex)) { - continue - } - - // 🟩 Compute bit position dynamically based on committeeBits - bitPosition := computeBitPosition( - duty.CommitteeIndex, - duty.ValidatorCommitteeIdx, - att.CommitteeBits, - committeeSizeMap, - ) - - if !isBitSet(att.AggregationBits, bitPosition) { - logger.Debug(" ❌ Validator %d not included in attestation for committee %d at slot %d (bit position %d).", - duty.ValidatorIndex, duty.CommitteeIndex, slot, bitPosition) - continue - } - - // ✅ Attestation found! - logger.Info("✅ Validator %d attested in committee %d for duty slot %d (included in block slot %d)", - duty.ValidatorIndex, duty.CommitteeIndex, duty.Slot, slot) - attestationFound = true - break // Found; no need to check more attestations in this block - } - if attestationFound { - break // Move on to next validator + bitPosition := computeBitPosition( + duty.CommitteeIndex, + duty.ValidatorCommitteeIdx, + att.CommitteeBits, + committeeSizeMap, + ) + if !isBitSet(att.AggregationBits, bitPosition) { + continue } + logger.Info("✅ Validator %d attested in committee %d for duty slot %d (included in block slot %d)", + duty.ValidatorIndex, duty.CommitteeIndex, duty.Slot, slot) + return true } - - if !attestationFound { - logger.Warn(" ❌ No attestation found for validator %d in finalized epoch %d", - duty.ValidatorIndex, finalizedEpoch) - } - - // Mark validator as checked for this epoch - a.CheckedValidators[duty.ValidatorIndex] = finalizedEpoch } + return false } -// Compute the bit position of the validator in the aggregation_bits +// computeBitPosition calculates the bit position for the validator in the committee bits. +// It sums the sizes of all committees before the one the validator is in, and adds the validator's index in that committee. +// This is used to determine if the validator's aggregation bit is set in the attestation. func computeBitPosition( validatorCommitteeIndex domain.CommitteeIndex, validatorIndexInCommittee uint64, @@ -146,26 +185,23 @@ func computeBitPosition( ) int { bitPosition := 0 for i := 0; i < 64; i++ { - if !isBitSet(committeeBits, i) { + if !isBitSet(committeeBits, i) { // if the committee bit is not set, dont add its size to final bit position continue } - if i == int(validatorCommitteeIndex) { + if i == int(validatorCommitteeIndex) { // We got to the committee of the validator, we can stop here. break } - bitPosition += committeeSizeMap[domain.CommitteeIndex(i)] + bitPosition += committeeSizeMap[domain.CommitteeIndex(i)] // Add the size of the committee to the bit position. Bit was set and it's not the committee of the validator. } bitPosition += int(validatorIndexInCommittee) return bitPosition } -// isBitSet checks if a bit at a particular index is set in a bitfield func isBitSet(bits []byte, index int) bool { byteIndex := index / 8 bitIndex := index % 8 - if byteIndex >= len(bits) { return false } - return (bits[byteIndex] & (1 << uint(bitIndex))) != 0 }