Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,10 @@ func main() {
// Start the attestation checker service in a goroutine
logger.Info("Starting attestation checker for %d validators", len(indices))
checker := &services.AttestationChecker{
BeaconAdapter: adapter,
ValidatorIndices: indices,
PollInterval: 1 * time.Minute,
CheckedValidators: make(map[domain.ValidatorIndex]domain.Epoch),
BeaconAdapter: adapter,
ValidatorIndices: indices,
PollInterval: 1 * time.Minute,
CheckedEpochs: make(map[domain.ValidatorIndex]domain.Epoch),
}
wg.Add(1)
go func() {
Expand Down
1 change: 0 additions & 1 deletion internal/application/domain/validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ type Slot uint64
type ValidatorIndex uint64
type CommitteeIndex uint64

// domain/domain.go
type ValidatorDuty struct {
Slot Slot
CommitteeIndex CommitteeIndex
Expand Down
194 changes: 115 additions & 79 deletions internal/application/services/attestation_oracle.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,10 @@ type AttestationChecker struct {
PollInterval time.Duration

lastFinalizedEpoch domain.Epoch
CheckedValidators map[domain.ValidatorIndex]domain.Epoch
CheckedEpochs map[domain.ValidatorIndex]domain.Epoch // latest epoch checked for each validator index
}

// If at interval, ticker ticks but check has not ended, we wont start a new check, we will just wait for the next tick.
func (a *AttestationChecker) Run(ctx context.Context) {
ticker := time.NewTicker(a.PollInterval)
defer ticker.Stop()
Expand All @@ -32,112 +33,150 @@ func (a *AttestationChecker) Run(ctx context.Context) {
}
}

// checkLatestFinalizedEpoch checks the latest finalized epoch and verifies if validators have attested.
func (a *AttestationChecker) checkLatestFinalizedEpoch(ctx context.Context) {
finalizedEpoch, err := a.BeaconAdapter.GetFinalizedEpoch(ctx)
if err != nil {
logger.Error("Error fetching finalized epoch: %v", err)
return
}

// If finalized epoch is same as last seen, skip execution
if finalizedEpoch == a.lastFinalizedEpoch {
logger.Debug("Finalized epoch %d unchanged, skipping check.", finalizedEpoch)
return
}

logger.Info("New finalized epoch %d detected. Checking attestations for validators...", finalizedEpoch)
a.lastFinalizedEpoch = finalizedEpoch
logger.Info("New finalized epoch %d detected. Checking attestations for validators...", finalizedEpoch)

// Determine which validators still need checking for this epoch
var validatorsToCheck []domain.ValidatorIndex
for _, index := range a.ValidatorIndices {
// Skip if already confirmed
if epoch, ok := a.CheckedValidators[index]; ok && epoch == finalizedEpoch {
logger.Debug("Duties of validator %d already successfully checked for epoch %d. Skipping.", index, finalizedEpoch)
continue
}
validatorsToCheck = append(validatorsToCheck, index)
}

// If no validators to check, skip
if len(validatorsToCheck) == 0 {
logger.Info("All validators already checked for epoch %d. Skipping.", finalizedEpoch)
validatorIndices := a.getValidatorsToCheck(finalizedEpoch)
if len(validatorIndices) == 0 {
return
}

// Fetch all duties in batch
duties, err := a.BeaconAdapter.GetValidatorDutiesBatch(ctx, finalizedEpoch, validatorsToCheck)
duties, err := a.BeaconAdapter.GetValidatorDutiesBatch(ctx, finalizedEpoch, validatorIndices)
if err != nil {
logger.Error("Error fetching validator duties: %v", err)
return
}

// For each duty, check attestation inclusion
// This should never happen, we got here if we had no errors getting validator duties and there are validators to check.
if len(duties) == 0 {
logger.Warn("No duties found for finalized epoch %d. This should not happen!", finalizedEpoch)
return
}

minSlot, maxSlot := getSlotRangeForDuties(duties) // Maximum of 32 + 31 = 63 slots max. Will only happen with users with lots of validators.
slotAttestations := preloadSlotAttestations(ctx, a.BeaconAdapter, minSlot, maxSlot) // get all attestations for the needed slots
committeeSizeCache := make(map[domain.Slot]domain.CommitteeSizeMap)

// Fore each duty (one x validator), check if there is an attestation for the duty slot in the next 32 slots.
for _, duty := range duties {
attestationFound := false
attestationFound := a.checkDutyAttestation(ctx, duty, slotAttestations, committeeSizeCache)
if !attestationFound {
logger.Warn(" ❌ No attestation found for validator %d in finalized epoch %d",
duty.ValidatorIndex, finalizedEpoch)
}
a.markCheckedThisEpoch(duty.ValidatorIndex, finalizedEpoch)
}
}

logger.Debug("Checking duty for validator %d in committee %d of slot %d",
duty.ValidatorIndex, duty.CommitteeIndex, duty.Slot)
func (a *AttestationChecker) getValidatorsToCheck(epoch domain.Epoch) []domain.ValidatorIndex {
var result []domain.ValidatorIndex
for _, index := range a.ValidatorIndices {
if a.wasCheckedThisEpoch(index, epoch) {
continue
}
result = append(result, index)
}
return result
}

func (a *AttestationChecker) wasCheckedThisEpoch(index domain.ValidatorIndex, epoch domain.Epoch) bool {
return a.CheckedEpochs[index] == epoch
}

// Check up to 32 slots after the duty slot
for slot := duty.Slot + 1; slot <= duty.Slot+32; slot++ {
attestations, err := a.BeaconAdapter.GetBlockAttestations(ctx, slot)
if err != nil {
logger.Warn("Error fetching attestations for slot %d: %v", slot, err)
func (a *AttestationChecker) markCheckedThisEpoch(index domain.ValidatorIndex, epoch domain.Epoch) {
if a.CheckedEpochs == nil {
a.CheckedEpochs = make(map[domain.ValidatorIndex]domain.Epoch)
}
a.CheckedEpochs[index] = epoch
}

// Important: This function assumes duties is not empty (at least one duty exists).
func getSlotRangeForDuties(duties []domain.ValidatorDuty) (domain.Slot, domain.Slot) {
minSlot, maxSlot := duties[0].Slot, duties[0].Slot
for _, d := range duties {
if d.Slot < minSlot {
minSlot = d.Slot
}
if d.Slot > maxSlot {
maxSlot = d.Slot
}
}
return minSlot, maxSlot
}

func preloadSlotAttestations(ctx context.Context, beacon ports.BeaconChainAdapter, minSlot, maxSlot domain.Slot) map[domain.Slot][]domain.Attestation {
result := make(map[domain.Slot][]domain.Attestation)
for slot := minSlot + 1; slot <= maxSlot+32; slot++ {
att, err := beacon.GetBlockAttestations(ctx, slot)
if err != nil {
logger.Warn("Error fetching attestations for slot %d: %v. Was this slot missed?", slot, err)
continue
}
result[slot] = att
}
return result
}

// checkDutyAttestation checks if there is an attestation for the given duty in the next 32 slots.
// It uses the committee size cache to avoid fetching committee sizes for every duty in repeated slots.
func (a *AttestationChecker) checkDutyAttestation(
ctx context.Context,
duty domain.ValidatorDuty,
slotAttestations map[domain.Slot][]domain.Attestation,
committeeSizeCache map[domain.Slot]domain.CommitteeSizeMap,
) bool {
committeeSizeMap, ok := committeeSizeCache[duty.Slot]
if !ok {
var err error
committeeSizeMap, err = a.BeaconAdapter.GetCommitteeSizeMap(ctx, duty.Slot)
if err != nil {
logger.Warn("Error fetching committee sizes for slot %d: %v", duty.Slot, err)
return false
}
committeeSizeCache[duty.Slot] = committeeSizeMap
}

for slot := duty.Slot + 1; slot <= duty.Slot+32; slot++ {
attestations := slotAttestations[slot]
for _, att := range attestations {
if att.DataSlot != duty.Slot {
continue
}
// Fetch committee sizes for this block slot
committeeSizeMap, err := a.BeaconAdapter.GetCommitteeSizeMap(ctx, duty.Slot)
if err != nil {
logger.Warn("Error fetching committee sizes for slot %d: %v", slot, err)
if !isBitSet(att.CommitteeBits, int(duty.CommitteeIndex)) {
continue
}

for _, att := range attestations {
if att.DataSlot != duty.Slot {
continue
}
if !isBitSet(att.CommitteeBits, int(duty.CommitteeIndex)) {
continue
}

// 🟩 Compute bit position dynamically based on committeeBits
bitPosition := computeBitPosition(
duty.CommitteeIndex,
duty.ValidatorCommitteeIdx,
att.CommitteeBits,
committeeSizeMap,
)

if !isBitSet(att.AggregationBits, bitPosition) {
logger.Debug(" ❌ Validator %d not included in attestation for committee %d at slot %d (bit position %d).",
duty.ValidatorIndex, duty.CommitteeIndex, slot, bitPosition)
continue
}

// ✅ Attestation found!
logger.Info("✅ Validator %d attested in committee %d for duty slot %d (included in block slot %d)",
duty.ValidatorIndex, duty.CommitteeIndex, duty.Slot, slot)
attestationFound = true
break // Found; no need to check more attestations in this block
}
if attestationFound {
break // Move on to next validator
bitPosition := computeBitPosition(
duty.CommitteeIndex,
duty.ValidatorCommitteeIdx,
att.CommitteeBits,
committeeSizeMap,
)
if !isBitSet(att.AggregationBits, bitPosition) {
continue
}
logger.Info("✅ Validator %d attested in committee %d for duty slot %d (included in block slot %d)",
duty.ValidatorIndex, duty.CommitteeIndex, duty.Slot, slot)
return true
}

if !attestationFound {
logger.Warn(" ❌ No attestation found for validator %d in finalized epoch %d",
duty.ValidatorIndex, finalizedEpoch)
}

// Mark validator as checked for this epoch
a.CheckedValidators[duty.ValidatorIndex] = finalizedEpoch
}
return false
}

// Compute the bit position of the validator in the aggregation_bits
// computeBitPosition calculates the bit position for the validator in the committee bits.
// It sums the sizes of all committees before the one the validator is in, and adds the validator's index in that committee.
// This is used to determine if the validator's aggregation bit is set in the attestation.
func computeBitPosition(
validatorCommitteeIndex domain.CommitteeIndex,
validatorIndexInCommittee uint64,
Expand All @@ -146,26 +185,23 @@ func computeBitPosition(
) int {
bitPosition := 0
for i := 0; i < 64; i++ {
if !isBitSet(committeeBits, i) {
if !isBitSet(committeeBits, i) { // if the committee bit is not set, dont add its size to final bit position
continue
}
if i == int(validatorCommitteeIndex) {
if i == int(validatorCommitteeIndex) { // We got to the committee of the validator, we can stop here.
break
}
bitPosition += committeeSizeMap[domain.CommitteeIndex(i)]
bitPosition += committeeSizeMap[domain.CommitteeIndex(i)] // Add the size of the committee to the bit position. Bit was set and it's not the committee of the validator.
}
bitPosition += int(validatorIndexInCommittee)
return bitPosition
}

// isBitSet checks if a bit at a particular index is set in a bitfield
func isBitSet(bits []byte, index int) bool {
byteIndex := index / 8
bitIndex := index % 8

if byteIndex >= len(bits) {
return false
}

return (bits[byteIndex] & (1 << uint(bitIndex))) != 0
}